You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/05 03:44:41 UTC

[8/9] incubator-quickstep git commit: - Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key. - Supports copy elision for aggregation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/387275f7/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index d851a0c..cfaa663 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -21,15 +21,14 @@
 #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_MAX_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <utility>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -40,9 +39,8 @@
 
 namespace quickstep {
 
+class AggregationStateHashTableBase;
 class ColumnVector;
-class StorageManager;
-class ValueAccessor;
 
 /** \addtogroup Expressions
  *  @{
@@ -86,42 +84,41 @@ class AggregationHandleMax : public AggregationConcreteHandle {
  public:
   ~AggregationHandleMax() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&type_};
+  }
+
+  const Type* getResultType() const override {
+    return &type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateMax(type_);
   }
 
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  /**
-   * @brief Iterate with max aggregation state.
-   */
   inline void iterateUnaryInl(AggregationStateMax *state,
                               const TypedValue &value) const {
     DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
     compareAndUpdate(static_cast<AggregationStateMax *>(state), value);
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
+  inline void iterateUnaryInl(const TypedValue &value,
+                              std::uint8_t *byte_ptr) const {
     DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
     TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
-    compareAndUpdateFast(max_ptr, value);
+    compareAndUpdate(max_ptr, value);
   }
 
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
+  AggregationState* accumulateValueAccessor(
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const ValueAccessorMultiplexer &accessor_mux) const override;
 
-  void blockUpdate() override { block_update_ = true; }
+  void mergeStates(const AggregationState &source,
+                   AggregationState *destination) const override;
 
-  void allowUpdate() override { block_update_ = false; }
+  std::size_t getPayloadSize() const override {
+    return sizeof(TypedValue);
+  }
 
   void initPayload(std::uint8_t *byte_ptr) const override {
     TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
@@ -136,38 +133,21 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    DCHECK(argument.isPlausibleInstanceOf(type_.getSignature()));
+    TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    compareAndUpdate(max_ptr, argument);
+  }
 
   TypedValue finalize(const AggregationState &state) const override {
     return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
   }
 
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
-  }
+  void mergeStates(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
 
-  inline TypedValue finalizeHashTableEntryFast(
+  inline TypedValue finalizeHashTableEntry(
       const std::uint8_t *byte_ptr) const {
     const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
     return TypedValue(*max_ptr);
@@ -175,29 +155,16 @@ class AggregationHandleMax : public AggregationConcreteHandle {
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for MAX aggregation.
-   */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for MAX aggregation.
-   */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionMax;
@@ -227,8 +194,8 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     }
   }
 
-  inline void compareAndUpdateFast(TypedValue *max_ptr,
-                                   const TypedValue &value) const {
+  inline void compareAndUpdate(TypedValue *max_ptr,
+                               const TypedValue &value) const {
     if (value.isNull()) return;
     if (max_ptr->isNull() ||
         fast_comparator_->compareTypedValues(value, *max_ptr)) {
@@ -239,8 +206,6 @@ class AggregationHandleMax : public AggregationConcreteHandle {
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 
-  bool block_update_;
-
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/387275f7/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index a07f299..08fb141 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -19,15 +19,16 @@
 
 #include "expressions/aggregation/AggregationHandleMin.hpp"
 
+#include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
 #include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
 #include "types/operations/comparisons/ComparisonID.hpp"
@@ -36,54 +37,32 @@
 
 namespace quickstep {
 
-class StorageManager;
+class ColumnVector;
 
 AggregationHandleMin::AggregationHandleMin(const Type &type)
-    : type_(type), block_update_(false) {
+    : AggregationConcreteHandle(AggregationID::kMin),
+      type_(type) {
   fast_comparator_.reset(
       ComparisonFactory::GetComparison(ComparisonID::kLess)
           .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
 }
 
-AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
+AggregationState* AggregationHandleMin::accumulateValueAccessor(
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    const ValueAccessorMultiplexer &accessor_mux) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for MIN: " << argument_ids.size();
 
-AggregationState* AggregationHandleMin::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-  return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
-      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMin::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for MIN: " << accessor_ids.size();
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
   return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
       type_.getNullableVersion().makeNullValue(),
-      accessor,
-      accessor_ids.front()));
-}
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for MIN: " << argument_ids.size();
+      accessor_mux.getValueAccessorBySource(argument_source),
+      argument_id));
 }
 
 void AggregationHandleMin::mergeStates(const AggregationState &source,
@@ -98,41 +77,37 @@ void AggregationHandleMin::mergeStates(const AggregationState &source,
   }
 }
 
-void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
+void AggregationHandleMin::mergeStates(const std::uint8_t *source,
+                                       std::uint8_t *destination) const {
   const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
   TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
 
   if (!(src_min_ptr->isNull())) {
-    compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
+    compareAndUpdate(dst_min_ptr, *src_min_ptr);
   }
 }
 
 ColumnVector* AggregationHandleMin::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleMin,
-                                     AggregationStateFastHashTable>(
-      type_.getNonNullableVersion(), hash_table, group_by_keys, index);
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+  return finalizeHashTableHelper<AggregationHandleMin>(
+      type_, hash_table, index, group_by_keys);
 }
 
-AggregationState*
-AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
+AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleMin,
-      AggregationStateMin>(distinctify_hash_table);
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+      AggregationHandleMin, AggregationStateMin>(
+          distinctify_hash_table);
 }
 
 void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleMin,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
+    const std::size_t index,
+    AggregationStateHashTableBase *aggregation_hash_table) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleMin>(
+      distinctify_hash_table, index, aggregation_hash_table);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/387275f7/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index e3472ec..9c0012e 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -21,15 +21,14 @@
 #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_MIN_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <utility>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -40,9 +39,8 @@
 
 namespace quickstep {
 
+class AggregationStateHashTableBase;
 class ColumnVector;
-class StorageManager;
-class ValueAccessor;
 
 /** \addtogroup Expressions
  *  @{
@@ -88,42 +86,45 @@ class AggregationHandleMin : public AggregationConcreteHandle {
  public:
   ~AggregationHandleMin() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&type_};
+  }
+
+  const Type* getResultType() const override {
+    return &type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateMin(type_);
   }
 
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  /**
-   * @brief Iterate with min aggregation state.
-   */
   inline void iterateUnaryInl(AggregationStateMin *state,
                               const TypedValue &value) const {
     DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
     compareAndUpdate(state, value);
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
+  inline void iterateUnaryInl(const TypedValue &value,
+                              std::uint8_t *byte_ptr) const {
     DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
     TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
-    compareAndUpdateFast(min_ptr, value);
+    compareAndUpdate(min_ptr, value);
   }
 
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
+  AggregationState* accumulateValueAccessor(
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const ValueAccessorMultiplexer &accessor_mux) const override;
 
-  void blockUpdate() override { block_update_ = true; }
+  void mergeStates(const AggregationState &source,
+                   AggregationState *destination) const override;
 
-  void allowUpdate() override { block_update_ = false; }
+  TypedValue finalize(const AggregationState &state) const override {
+    return static_cast<const AggregationStateMin &>(state).min_;
+  }
+
+  std::size_t getPayloadSize() const override {
+    return sizeof(TypedValue);
+  }
 
   void initPayload(std::uint8_t *byte_ptr) const override {
     TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
@@ -138,68 +139,33 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
-  TypedValue finalize(const AggregationState &state) const override {
-    return static_cast<const AggregationStateMin &>(state).min_;
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    DCHECK(argument.isPlausibleInstanceOf(type_.getSignature()));
+    TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    compareAndUpdate(min_ptr, argument);
   }
 
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return static_cast<const AggregationStateMin &>(state).min_;
-  }
+  void mergeStates(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
 
-  inline TypedValue finalizeHashTableEntryFast(
+  inline TypedValue finalizeHashTableEntry(
       const std::uint8_t *byte_ptr) const {
-    const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
-    return TypedValue(*min_ptr);
+    return *reinterpret_cast<const TypedValue *>(byte_ptr);
   }
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
-  /**
-   * @brief Implementation of
-   * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for MIN aggregation.
-   */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for MIN aggregation.
-   */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionMin;
@@ -228,8 +194,8 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     }
   }
 
-  inline void compareAndUpdateFast(TypedValue *min_ptr,
-                                   const TypedValue &value) const {
+  inline void compareAndUpdate(TypedValue *min_ptr,
+                               const TypedValue &value) const {
     if (value.isNull()) return;
     if (min_ptr->isNull() ||
         fast_comparator_->compareTypedValues(value, *min_ptr)) {
@@ -240,8 +206,6 @@ class AggregationHandleMin : public AggregationConcreteHandle {
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 
-  bool block_update_;
-
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/387275f7/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 642d88d..9f5f220 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -20,13 +20,13 @@
 #include "expressions/aggregation/AggregationHandleSum.hpp"
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
-#include <utility>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFactory.hpp"
@@ -40,10 +40,11 @@
 
 namespace quickstep {
 
-class StorageManager;
+class ColumnVector;
 
 AggregationHandleSum::AggregationHandleSum(const Type &type)
-    : argument_type_(type), block_update_(false) {
+    : AggregationConcreteHandle(AggregationID::kSum),
+      argument_type_(type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;
@@ -79,47 +80,27 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
   result_type_ = &sum_type.getNullableVersion();
 }
 
-AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
+AggregationState* AggregationHandleSum::accumulateValueAccessor(
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    const ValueAccessorMultiplexer &accessor_mux) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for SUM: " << argument_ids.size();
 
-AggregationState* AggregationHandleSum::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
-  std::size_t num_tuples = 0;
-  TypedValue cv_sum = fast_operator_->accumulateColumnVector(
-      blank_state_.sum_, *column_vectors.front(), &num_tuples);
-  return new AggregationStateSum(std::move(cv_sum), num_tuples == 0);
-}
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleSum::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for SUM: " << accessor_ids.size();
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
   std::size_t num_tuples = 0;
-  TypedValue va_sum = fast_operator_->accumulateValueAccessor(
-      blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples);
+  TypedValue va_sum =
+      fast_operator_->accumulateValueAccessor(
+          blank_state_.sum_,
+          accessor_mux.getValueAccessorBySource(argument_source),
+          argument_id,
+          &num_tuples);
   return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
 }
-#endif
-
-void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for SUM: " << argument_ids.size();
-}
 
 void AggregationHandleSum::mergeStates(const AggregationState &source,
                                        AggregationState *destination) const {
@@ -134,8 +115,8 @@ void AggregationHandleSum::mergeStates(const AggregationState &source,
   sum_destination->null_ = sum_destination->null_ && sum_source.null_;
 }
 
-void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
+void AggregationHandleSum::mergeStates(const std::uint8_t *source,
+                                       std::uint8_t *destination) const {
   const TypedValue *src_sum_ptr =
       reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
   const bool *src_null_ptr =
@@ -162,29 +143,25 @@ TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
 
 ColumnVector* AggregationHandleSum::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleSum,
-                                     AggregationStateFastHashTable>(
-      *result_type_, hash_table, group_by_keys, index);
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+  return finalizeHashTableHelper<AggregationHandleSum>(
+      *result_type_, hash_table, index, group_by_keys);
 }
 
-AggregationState*
-AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
+AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleSum,
-      AggregationStateSum>(distinctify_hash_table);
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+      AggregationHandleSum, AggregationStateSum>(
+          distinctify_hash_table);
 }
 
 void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleSum,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
+    const std::size_t index,
+    AggregationStateHashTableBase *aggregation_hash_table) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleSum>(
+      distinctify_hash_table, index, aggregation_hash_table);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/387275f7/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index f0d23e1..edab7bb 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -21,15 +21,14 @@
 #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_SUM_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <utility>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -40,9 +39,8 @@
 
 namespace quickstep {
 
+class AggregationStateHashTableBase;
 class ColumnVector;
-class StorageManager;
-class ValueAccessor;
 
 /** \addtogroup Expressions
  *  @{
@@ -101,16 +99,18 @@ class AggregationHandleSum : public AggregationConcreteHandle {
  public:
   ~AggregationHandleSum() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&argument_type_};
+  }
+
+  const Type* getResultType() const override {
+    return result_type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateSum(blank_state_);
   }
 
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
   inline void iterateUnaryInl(AggregationStateSum *state,
                               const TypedValue &value) const {
     DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
@@ -121,8 +121,8 @@ class AggregationHandleSum : public AggregationConcreteHandle {
     state->null_ = false;
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
+  inline void iterateUnaryInl(const TypedValue &value,
+                              std::uint8_t *byte_ptr) const {
     DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
     if (value.isNull()) return;
     TypedValue *sum_ptr =
@@ -133,16 +133,18 @@ class AggregationHandleSum : public AggregationConcreteHandle {
     *null_ptr = false;
   }
 
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
+  AggregationState* accumulateValueAccessor(
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const ValueAccessorMultiplexer &accessor_mux) const override;
 
-  void blockUpdate() override { block_update_ = true; }
+  void mergeStates(const AggregationState &source,
+                   AggregationState *destination) const override;
 
-  void allowUpdate() override { block_update_ = false; }
+  TypedValue finalize(const AggregationState &state) const override;
+
+  std::size_t getPayloadSize() const override {
+    return blank_state_.getPayloadSize();
+  }
 
   void initPayload(std::uint8_t *byte_ptr) const override {
     TypedValue *sum_ptr =
@@ -161,70 +163,37 @@ class AggregationHandleSum : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
-  TypedValue finalize(const AggregationState &state) const override;
-
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return static_cast<const AggregationStateSum &>(state).sum_;
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    DCHECK(argument.isPlausibleInstanceOf(argument_type_.getSignature()));
+    if (argument.isNull()) return;
+    TypedValue *sum_ptr =
+        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+    bool *null_ptr =
+        reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
+    *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, argument);
+    *null_ptr = false;
   }
 
-  inline TypedValue finalizeHashTableEntryFast(
-      const std::uint8_t *byte_ptr) const {
-    std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
-    TypedValue *sum_ptr =
-        reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
-    return *sum_ptr;
+  void mergeStates(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
+
+  inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const {
+    return *reinterpret_cast<const TypedValue *>(byte_ptr + blank_state_.sum_offset_);
   }
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for SUM aggregation.
-   */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for SUM aggregation.
-   */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override {
-    return blank_state_.getPayloadSize();
-  }
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionSum;
@@ -242,8 +211,6 @@ class AggregationHandleSum : public AggregationConcreteHandle {
   std::unique_ptr<UncheckedBinaryOperator> fast_operator_;
   std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
 
-  bool block_update_;
-
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/387275f7/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index e9503f7..4220a8d 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -55,9 +55,6 @@ add_library(quickstep_expressions_aggregation_AggregationHandleAvg
 add_library(quickstep_expressions_aggregation_AggregationHandleCount
             AggregationHandleCount.cpp
             AggregationHandleCount.hpp)
-add_library(quickstep_expressions_aggregation_AggregationHandleDistinct
-            AggregationHandleDistinct.cpp
-            AggregationHandleDistinct.hpp)
 add_library(quickstep_expressions_aggregation_AggregationHandleMax
             AggregationHandleMax.cpp
             AggregationHandleMax.hpp)
@@ -144,20 +141,21 @@ target_link_libraries(quickstep_expressions_aggregation_AggregateFunctionSum
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandle
                       glog
-                      quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
+                      quickstep_storage_PackedPayloadHashTable
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandle
                       glog
-                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_types_TypedValue
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
@@ -165,10 +163,8 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeFactory
@@ -183,39 +179,25 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_storage_ValueAccessorUtil
+                      quickstep_types_LongType
                       quickstep_types_TypeFactory
                       quickstep_types_TypeID
                       quickstep_types_TypedValue
-                      quickstep_types_containers_ColumnVector
-                      quickstep_types_containers_ColumnVectorUtil
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinct
-                      glog
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationConcreteHandle
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_types_TypedValue
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
                       glog
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
-                      quickstep_types_containers_ColumnVector
                       quickstep_types_operations_comparisons_Comparison
                       quickstep_types_operations_comparisons_ComparisonFactory
                       quickstep_types_operations_comparisons_ComparisonID
@@ -225,14 +207,11 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
-                      quickstep_types_containers_ColumnVector
                       quickstep_types_operations_comparisons_Comparison
                       quickstep_types_operations_comparisons_ComparisonFactory
                       quickstep_types_operations_comparisons_ComparisonID
@@ -242,10 +221,8 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeFactory
@@ -271,7 +248,6 @@ target_link_libraries(quickstep_expressions_aggregation
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationHandleAvg
                       quickstep_expressions_aggregation_AggregationHandleCount
-                      quickstep_expressions_aggregation_AggregationHandleDistinct
                       quickstep_expressions_aggregation_AggregationHandleMax
                       quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_expressions_aggregation_AggregationHandleSum
@@ -301,7 +277,9 @@ target_link_libraries(AggregationHandle_tests
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_AggregationOperationState
                       quickstep_storage_HashTableBase
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_types_CharType
                       quickstep_types_DateOperatorOverloads
                       quickstep_types_DatetimeIntervalType

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/387275f7/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
index 79d4448..0ad50d5 100644
--- a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
@@ -29,8 +29,9 @@
 #include "expressions/aggregation/AggregationHandleAvg.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/AggregationOperationState.hpp"
-#include "storage/FastHashTableFactory.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/CharType.hpp"
 #include "types/DateOperatorOverloads.hpp"
 #include "types/DatetimeIntervalType.hpp"
@@ -46,10 +47,7 @@
 #include "types/VarCharType.hpp"
 #include "types/YearMonthIntervalType.hpp"
 #include "types/containers/ColumnVector.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
-#endif
 
 #include "gtest/gtest.h"
 
@@ -192,39 +190,6 @@ class AggregationHandleAvgTest : public ::testing::Test {
   }
 
   template <typename GenericType, typename OutputType = DoubleType>
-  void checkAggregationAvgGenericColumnVector() {
-    const GenericType &type = GenericType::Instance(true);
-    initializeHandle(type);
-    EXPECT_TRUE(
-        aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_)
-            .isNull());
-
-    typename GenericType::cpptype sum;
-    SetDataType(0, &sum);
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(
-        createColumnVectorGeneric<GenericType>(type, &sum));
-
-    std::unique_ptr<AggregationState> cv_state(
-        aggregation_handle_avg_->accumulateColumnVectors(column_vectors));
-
-    // Test the state generated directly by accumulateColumnVectors(), and also
-    // test after merging back.
-    CheckAvgValue<typename OutputType::cpptype>(
-        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
-        *aggregation_handle_avg_,
-        *cv_state);
-
-    aggregation_handle_avg_->mergeStates(*cv_state,
-                                         aggregation_handle_avg_state_.get());
-    CheckAvgValue<typename OutputType::cpptype>(
-        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
-        *aggregation_handle_avg_,
-        *aggregation_handle_avg_state_);
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  template <typename GenericType, typename OutputType = DoubleType>
   void checkAggregationAvgGenericValueAccessor() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
@@ -240,7 +205,8 @@ class AggregationHandleAvgTest : public ::testing::Test {
 
     std::unique_ptr<AggregationState> va_state(
         aggregation_handle_avg_->accumulateValueAccessor(
-            accessor.get(), std::vector<attribute_id>(1, 0)));
+            {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+            ValueAccessorMultiplexer(accessor.get())));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
@@ -256,7 +222,6 @@ class AggregationHandleAvgTest : public ::testing::Test {
         *aggregation_handle_avg_,
         *aggregation_handle_avg_state_);
   }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
   std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
   std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
@@ -311,33 +276,6 @@ TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeTest) {
   checkAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
 }
 
-TEST_F(AggregationHandleAvgTest, IntTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<IntType>();
-}
-
-TEST_F(AggregationHandleAvgTest, LongTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<LongType>();
-}
-
-TEST_F(AggregationHandleAvgTest, FloatTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<FloatType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<DoubleType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<DatetimeIntervalType,
-                                         DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<YearMonthIntervalType,
-                                         YearMonthIntervalType>();
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 TEST_F(AggregationHandleAvgTest, IntTypeValueAccessorTest) {
   checkAggregationAvgGenericValueAccessor<IntType>();
 }
@@ -363,7 +301,6 @@ TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) {
   checkAggregationAvgGenericValueAccessor<YearMonthIntervalType,
                                           YearMonthIntervalType>();
 }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
 #ifdef QUICKSTEP_DEBUG
 TEST_F(AggregationHandleAvgDeathTest, CharTypeTest) {
@@ -468,28 +405,25 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
   initializeHandle(long_non_null_type);
   storage_manager_.reset(new StorageManager("./test_avg_data"));
   std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
-          {aggregation_handle_avg_.get()->getPayloadSize()},
           {aggregation_handle_avg_.get()},
           storage_manager_.get()));
   std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
-          {aggregation_handle_avg_.get()->getPayloadSize()},
           {aggregation_handle_avg_.get()},
           storage_manager_.get()));
 
-  AggregationStateFastHashTable *destination_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(
-          destination_hash_table.get());
+  PackedPayloadHashTable *destination_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(destination_hash_table.get());
 
-  AggregationStateFastHashTable *source_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
+  PackedPayloadHashTable *source_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(source_hash_table.get());
 
   AggregationHandleAvg *aggregation_handle_avg_derived =
       static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
@@ -546,29 +480,29 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
   memcpy(buffer + 1,
          common_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_avg_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(common_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          common_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_avg_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(common_key, buffer);
+  destination_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_avg_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_avg_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
+  destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key,
                                                       buffer);
 
   EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
   EXPECT_EQ(2u, source_hash_table_derived->numEntries());
 
-  AggregationOperationState::mergeGroupByHashTables(
-      source_hash_table.get(), destination_hash_table.get());
+  HashTableMerger merger(destination_hash_table_derived);
+  source_hash_table_derived->forEachCompositeKey(&merger);
 
   EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
 
@@ -576,21 +510,19 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
       (common_key_destination_avg_val.getLiteral<std::int64_t>() +
        common_key_source_avg_val.getLiteral<std::int64_t>()) /
           static_cast<double>(2),
-      aggregation_handle_avg_derived->finalizeHashTableEntryFast(
+      aggregation_handle_avg_derived->finalizeHashTableEntry(
           destination_hash_table_derived->getSingleCompositeKey(common_key) +
           1));
   CheckAvgValue<double>(
       exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
-      aggregation_handle_avg_derived->finalizeHashTableEntryFast(
+      aggregation_handle_avg_derived->finalizeHashTableEntry(
           destination_hash_table_derived->getSingleCompositeKey(
-              exclusive_destination_key) +
-          1));
+              exclusive_destination_key) + 1));
   CheckAvgValue<double>(
       exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
-      aggregation_handle_avg_derived->finalizeHashTableEntryFast(
+      aggregation_handle_avg_derived->finalizeHashTableEntry(
           source_hash_table_derived->getSingleCompositeKey(
-              exclusive_source_key) +
-          1));
+              exclusive_source_key) + 1));
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/387275f7/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
index 78bd249..86a014b 100644
--- a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
@@ -30,8 +30,9 @@
 #include "expressions/aggregation/AggregationHandleCount.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/AggregationOperationState.hpp"
-#include "storage/FastHashTableFactory.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/CharType.hpp"
 #include "types/DoubleType.hpp"
 #include "types/FloatType.hpp"
@@ -43,10 +44,7 @@
 #include "types/TypedValue.hpp"
 #include "types/VarCharType.hpp"
 #include "types/containers/ColumnVector.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
-#endif
 
 #include "gtest/gtest.h"
 
@@ -216,32 +214,6 @@ class AggregationHandleCountTest : public ::testing::Test {
   }
 
   template <typename NumericType>
-  void checkAggregationCountNumericColumnVector(int test_count) {
-    const NumericType &type = NumericType::Instance(true);
-    initializeHandle(&type);
-    CheckCountValue(
-        0, *aggregation_handle_count_, *aggregation_handle_count_state_);
-
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(
-        createColumnVectorNumeric<NumericType>(type, test_count));
-
-    std::unique_ptr<AggregationState> cv_state(
-        aggregation_handle_count_->accumulateColumnVectors(column_vectors));
-
-    // Test the state generated directly by accumulateColumnVectors(), and also
-    // test after merging back.
-    CheckCountValue(test_count, *aggregation_handle_count_, *cv_state);
-
-    aggregation_handle_count_->mergeStates(
-        *cv_state, aggregation_handle_count_state_.get());
-    CheckCountValue(test_count,
-                    *aggregation_handle_count_,
-                    *aggregation_handle_count_state_);
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  template <typename NumericType>
   void checkAggregationCountNumericValueAccessor(int test_count) {
     const NumericType &type = NumericType::Instance(true);
     initializeHandle(&type);
@@ -255,7 +227,8 @@ class AggregationHandleCountTest : public ::testing::Test {
 
     std::unique_ptr<AggregationState> va_state(
         aggregation_handle_count_->accumulateValueAccessor(
-            accessor.get(), std::vector<attribute_id>(1, 0)));
+            {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+            ValueAccessorMultiplexer(accessor.get())));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
@@ -267,7 +240,6 @@ class AggregationHandleCountTest : public ::testing::Test {
                     *aggregation_handle_count_,
                     *aggregation_handle_count_state_);
   }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
   template <typename StringType>
   void checkAggregationCountString(int test_count) {
@@ -326,32 +298,6 @@ class AggregationHandleCountTest : public ::testing::Test {
   }
 
   template <typename StringType, typename ColumnVectorType>
-  void checkAggregationCountStringColumnVector(int test_count) {
-    const StringType &type = StringType::Instance(10, true);
-    initializeHandle(&type);
-    CheckCountValue(
-        0, *aggregation_handle_count_, *aggregation_handle_count_state_);
-
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(
-        createColumnVectorString<ColumnVectorType>(type, test_count));
-
-    std::unique_ptr<AggregationState> cv_state(
-        aggregation_handle_count_->accumulateColumnVectors(column_vectors));
-
-    // Test the state generated directly by accumulateColumnVectors(), and also
-    // test after merging back.
-    CheckCountValue(test_count, *aggregation_handle_count_, *cv_state);
-
-    aggregation_handle_count_->mergeStates(
-        *cv_state, aggregation_handle_count_state_.get());
-    CheckCountValue(test_count,
-                    *aggregation_handle_count_,
-                    *aggregation_handle_count_state_);
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  template <typename StringType, typename ColumnVectorType>
   void checkAggregationCountStringValueAccessor(int test_count) {
     const StringType &type = StringType::Instance(10, true);
     initializeHandle(&type);
@@ -365,7 +311,8 @@ class AggregationHandleCountTest : public ::testing::Test {
 
     std::unique_ptr<AggregationState> va_state(
         aggregation_handle_count_->accumulateValueAccessor(
-            accessor.get(), std::vector<attribute_id>(1, 0)));
+            {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+            ValueAccessorMultiplexer(accessor.get())));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
@@ -377,7 +324,6 @@ class AggregationHandleCountTest : public ::testing::Test {
                     *aggregation_handle_count_,
                     *aggregation_handle_count_state_);
   }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
   std::unique_ptr<AggregationHandle> aggregation_handle_count_;
   std::unique_ptr<AggregationState> aggregation_handle_count_state_;
@@ -425,38 +371,6 @@ TEST_F(AggregationHandleCountTest, VarCharTypeTest) {
   checkAggregationCountString<VarCharType>(10000);
 }
 
-TEST_F(AggregationHandleCountTest, IntTypeColumnVectorTest) {
-  checkAggregationCountNumericColumnVector<IntType>(0);
-  checkAggregationCountNumericColumnVector<IntType>(10000);
-}
-
-TEST_F(AggregationHandleCountTest, LongTypeColumnVectorTest) {
-  checkAggregationCountNumericColumnVector<LongType>(0);
-  checkAggregationCountNumericColumnVector<LongType>(10000);
-}
-
-TEST_F(AggregationHandleCountTest, FloatTypeColumnVectorTest) {
-  checkAggregationCountNumericColumnVector<FloatType>(0);
-  checkAggregationCountNumericColumnVector<FloatType>(10000);
-}
-
-TEST_F(AggregationHandleCountTest, DoubleTypeColumnVectorTest) {
-  checkAggregationCountNumericColumnVector<DoubleType>(0);
-  checkAggregationCountNumericColumnVector<DoubleType>(10000);
-}
-
-TEST_F(AggregationHandleCountTest, CharTypeColumnVectorTest) {
-  checkAggregationCountStringColumnVector<CharType, NativeColumnVector>(0);
-  checkAggregationCountStringColumnVector<CharType, NativeColumnVector>(10000);
-}
-
-TEST_F(AggregationHandleCountTest, VarCharTypeColumnVectorTest) {
-  checkAggregationCountStringColumnVector<VarCharType, IndirectColumnVector>(0);
-  checkAggregationCountStringColumnVector<VarCharType, IndirectColumnVector>(
-      10000);
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 TEST_F(AggregationHandleCountTest, IntTypeValueAccessorTest) {
   checkAggregationCountNumericValueAccessor<IntType>(0);
   checkAggregationCountNumericValueAccessor<IntType>(10000);
@@ -488,7 +402,6 @@ TEST_F(AggregationHandleCountTest, VarCharTypeValueAccessorTest) {
   checkAggregationCountStringValueAccessor<VarCharType, IndirectColumnVector>(
       10000);
 }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
 TEST_F(AggregationHandleCountTest, canApplyToTypeTest) {
   EXPECT_TRUE(ApplyToTypesTest(kInt));
@@ -511,28 +424,25 @@ TEST_F(AggregationHandleCountTest, GroupByTableMergeTestCount) {
   initializeHandle(&long_non_null_type);
   storage_manager_.reset(new StorageManager("./test_count_data"));
   std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
-          {aggregation_handle_count_.get()->getPayloadSize()},
           {aggregation_handle_count_.get()},
           storage_manager_.get()));
   std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
-          {aggregation_handle_count_.get()->getPayloadSize()},
           {aggregation_handle_count_.get()},
           storage_manager_.get()));
 
-  AggregationStateFastHashTable *destination_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(
-          destination_hash_table.get());
+  PackedPayloadHashTable *destination_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(destination_hash_table.get());
 
-  AggregationStateFastHashTable *source_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
+  PackedPayloadHashTable *source_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(source_hash_table.get());
 
   // TODO(harshad) - Use TemplateUtil::CreateBoolInstantiatedInstance to
   // generate all the combinations of the bool template arguments and test them.
@@ -612,49 +522,48 @@ TEST_F(AggregationHandleCountTest, GroupByTableMergeTestCount) {
   memcpy(buffer + 1,
          common_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_count_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(common_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          common_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_count_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(common_key, buffer);
+  destination_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_count_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_count_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
-                                                      buffer);
+  destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key,
+                                                     buffer);
 
   EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
   EXPECT_EQ(2u, source_hash_table_derived->numEntries());
 
-  AggregationOperationState::mergeGroupByHashTables(
-      source_hash_table.get(), destination_hash_table.get());
+  HashTableMerger merger(destination_hash_table_derived);
+  source_hash_table_derived->forEachCompositeKey(&merger);
 
   EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
 
   CheckCountValue(
       common_key_destination_count_val.getLiteral<std::int64_t>() +
           common_key_source_count_val.getLiteral<std::int64_t>(),
-      aggregation_handle_count_derived->finalizeHashTableEntryFast(
+      aggregation_handle_count_derived->finalizeHashTableEntry(
           destination_hash_table_derived->getSingleCompositeKey(common_key) +
           1));
   CheckCountValue(
       exclusive_key_destination_count_val.getLiteral<std::int64_t>(),
-      aggregation_handle_count_derived->finalizeHashTableEntryFast(
+      aggregation_handle_count_derived->finalizeHashTableEntry(
           destination_hash_table_derived->getSingleCompositeKey(
-              exclusive_destination_key) +
-          1));
-  CheckCountValue(exclusive_key_source_count_val.getLiteral<std::int64_t>(),
-                  aggregation_handle_count_derived->finalizeHashTableEntryFast(
-                      source_hash_table_derived->getSingleCompositeKey(
-                          exclusive_source_key) +
-                      1));
+              exclusive_destination_key) + 1));
+  CheckCountValue(
+      exclusive_key_source_count_val.getLiteral<std::int64_t>(),
+      aggregation_handle_count_derived->finalizeHashTableEntry(
+          source_hash_table_derived->getSingleCompositeKey(
+              exclusive_source_key) + 1));
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/387275f7/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
index 026bd1d..d5e8d18 100644
--- a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
@@ -32,9 +32,10 @@
 #include "expressions/aggregation/AggregationHandleMax.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/AggregationOperationState.hpp"
-#include "storage/FastHashTableFactory.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/CharType.hpp"
 #include "types/DatetimeIntervalType.hpp"
 #include "types/DatetimeLit.hpp"
@@ -51,10 +52,7 @@
 #include "types/VarCharType.hpp"
 #include "types/YearMonthIntervalType.hpp"
 #include "types/containers/ColumnVector.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
-#endif
 
 #include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
@@ -223,34 +221,6 @@ class AggregationHandleMaxTest : public ::testing::Test {
   }
 
   template <typename GenericType>
-  void checkAggregationMaxGenericColumnVector() {
-    const GenericType &type = GenericType::Instance(true);
-    initializeHandle(type);
-    EXPECT_TRUE(
-        aggregation_handle_max_->finalize(*aggregation_handle_max_state_)
-            .isNull());
-
-    typename GenericType::cpptype max;
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(
-        createColumnVectorGeneric<GenericType>(type, &max));
-
-    std::unique_ptr<AggregationState> cv_state(
-        aggregation_handle_max_->accumulateColumnVectors(column_vectors));
-
-    // Test the state generated directly by accumulateColumnVectors(), and also
-    // test after merging back.
-    CheckMaxValue<typename GenericType::cpptype>(
-        max, *aggregation_handle_max_, *cv_state);
-
-    aggregation_handle_max_->mergeStates(*cv_state,
-                                         aggregation_handle_max_state_.get());
-    CheckMaxValue<typename GenericType::cpptype>(
-        max, *aggregation_handle_max_, *aggregation_handle_max_state_);
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  template <typename GenericType>
   void checkAggregationMaxGenericValueAccessor() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
@@ -266,7 +236,8 @@ class AggregationHandleMaxTest : public ::testing::Test {
 
     std::unique_ptr<AggregationState> va_state(
         aggregation_handle_max_->accumulateValueAccessor(
-            accessor.get(), std::vector<attribute_id>(1, 0)));
+            {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+            ValueAccessorMultiplexer(accessor.get())));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
@@ -278,7 +249,6 @@ class AggregationHandleMaxTest : public ::testing::Test {
     CheckMaxValue<typename GenericType::cpptype>(
         max, *aggregation_handle_max_, *aggregation_handle_max_state_);
   }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
   template <typename StringType>
   void checkAggregationMaxString() {
@@ -385,33 +355,6 @@ class AggregationHandleMaxTest : public ::testing::Test {
   }
 
   template <typename StringType, typename ColumnVectorType>
-  void checkAggregationMaxStringColumnVector() {
-    const StringType &type = StringType::Instance(10, true);
-    initializeHandle(type);
-    EXPECT_TRUE(
-        aggregation_handle_max_->finalize(*aggregation_handle_max_state_)
-            .isNull());
-
-    std::string max;
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(
-        createColumnVectorString<ColumnVectorType>(type, &max));
-
-    std::unique_ptr<AggregationState> cv_state(
-        aggregation_handle_max_->accumulateColumnVectors(column_vectors));
-
-    // Test the state generated directly by accumulateColumnVectors(), and also
-    // test after merging back.
-    CheckMaxString(max, *aggregation_handle_max_, *cv_state);
-
-    aggregation_handle_max_->mergeStates(*cv_state,
-                                         aggregation_handle_max_state_.get());
-    CheckMaxString(
-        max, *aggregation_handle_max_, *aggregation_handle_max_state_);
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  template <typename StringType, typename ColumnVectorType>
   void checkAggregationMaxStringValueAccessor() {
     const StringType &type = StringType::Instance(10, true);
     initializeHandle(type);
@@ -426,7 +369,8 @@ class AggregationHandleMaxTest : public ::testing::Test {
 
     std::unique_ptr<AggregationState> va_state(
         aggregation_handle_max_->accumulateValueAccessor(
-            accessor.get(), std::vector<attribute_id>(1, 0)));
+            {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+            ValueAccessorMultiplexer(accessor.get())));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
@@ -437,7 +381,6 @@ class AggregationHandleMaxTest : public ::testing::Test {
     CheckMaxString(
         max, *aggregation_handle_max_, *aggregation_handle_max_state_);
   }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
   std::unique_ptr<AggregationHandle> aggregation_handle_max_;
   std::unique_ptr<AggregationState> aggregation_handle_max_state_;
@@ -514,43 +457,6 @@ TEST_F(AggregationHandleMaxTest, VarCharTypeTest) {
   checkAggregationMaxString<VarCharType>();
 }
 
-TEST_F(AggregationHandleMaxTest, IntTypeColumnVectorTest) {
-  checkAggregationMaxGenericColumnVector<IntType>();
-}
-
-TEST_F(AggregationHandleMaxTest, LongTypeColumnVectorTest) {
-  checkAggregationMaxGenericColumnVector<LongType>();
-}
-
-TEST_F(AggregationHandleMaxTest, FloatTypeColumnVectorTest) {
-  checkAggregationMaxGenericColumnVector<FloatType>();
-}
-
-TEST_F(AggregationHandleMaxTest, DoubleTypeColumnVectorTest) {
-  checkAggregationMaxGenericColumnVector<DoubleType>();
-}
-
-TEST_F(AggregationHandleMaxTest, DatetimeTypeColumnVectorTest) {
-  checkAggregationMaxGenericColumnVector<DatetimeType>();
-}
-
-TEST_F(AggregationHandleMaxTest, DatetimeIntervalTypeColumnVectorTest) {
-  checkAggregationMaxGenericColumnVector<DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleMaxTest, YearMonthIntervalTypeColumnVectorTest) {
-  checkAggregationMaxGenericColumnVector<YearMonthIntervalType>();
-}
-
-TEST_F(AggregationHandleMaxTest, CharTypeColumnVectorTest) {
-  checkAggregationMaxStringColumnVector<CharType, NativeColumnVector>();
-}
-
-TEST_F(AggregationHandleMaxTest, VarCharColumnVectorTypeTest) {
-  checkAggregationMaxStringColumnVector<VarCharType, IndirectColumnVector>();
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 TEST_F(AggregationHandleMaxTest, IntTypeValueAccessorTest) {
   checkAggregationMaxGenericValueAccessor<IntType>();
 }
@@ -586,7 +492,6 @@ TEST_F(AggregationHandleMaxTest, CharTypeValueAccessorTest) {
 TEST_F(AggregationHandleMaxTest, VarCharValueAccessorTypeTest) {
   checkAggregationMaxStringValueAccessor<VarCharType, IndirectColumnVector>();
 }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
 #ifdef QUICKSTEP_DEBUG
 TEST_F(AggregationHandleMaxDeathTest, WrongTypeTest) {
@@ -689,28 +594,25 @@ TEST_F(AggregationHandleMaxTest, GroupByTableMergeTest) {
   initializeHandle(int_non_null_type);
   storage_manager_.reset(new StorageManager("./test_max_data"));
   std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &int_non_null_type),
           10,
-          {aggregation_handle_max_.get()->getPayloadSize()},
           {aggregation_handle_max_.get()},
           storage_manager_.get()));
   std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &int_non_null_type),
           10,
-          {aggregation_handle_max_.get()->getPayloadSize()},
           {aggregation_handle_max_.get()},
           storage_manager_.get()));
 
-  AggregationStateFastHashTable *destination_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(
-          destination_hash_table.get());
+  PackedPayloadHashTable *destination_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(destination_hash_table.get());
 
-  AggregationStateFastHashTable *source_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
+  PackedPayloadHashTable *source_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(source_hash_table.get());
 
   AggregationHandleMax *aggregation_handle_max_derived =
       static_cast<AggregationHandleMax *>(aggregation_handle_max_.get());
@@ -780,47 +682,47 @@ TEST_F(AggregationHandleMaxTest, GroupByTableMergeTest) {
   memcpy(buffer + 1,
          common_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_max_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(common_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          common_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_max_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(common_key, buffer);
+  destination_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_max_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_max_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
-                                                      buffer);
+  destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key,
+                                                     buffer);
 
   EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
   EXPECT_EQ(2u, source_hash_table_derived->numEntries());
 
-  AggregationOperationState::mergeGroupByHashTables(
-      source_hash_table.get(), destination_hash_table.get());
+  HashTableMerger merger(destination_hash_table_derived);
+  source_hash_table_derived->forEachCompositeKey(&merger);
 
   EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
 
   CheckMaxValue<int>(
       common_key_destination_max_val.getLiteral<int>(),
-      aggregation_handle_max_derived->finalizeHashTableEntryFast(
+      aggregation_handle_max_derived->finalizeHashTableEntry(
           destination_hash_table_derived->getSingleCompositeKey(common_key) +
           1));
-  CheckMaxValue<int>(exclusive_key_destination_max_val.getLiteral<int>(),
-                     aggregation_handle_max_derived->finalizeHashTableEntryFast(
-                         destination_hash_table_derived->getSingleCompositeKey(
-                             exclusive_destination_key) +
-                         1));
-  CheckMaxValue<int>(exclusive_key_source_max_val.getLiteral<int>(),
-                     aggregation_handle_max_derived->finalizeHashTableEntryFast(
-                         source_hash_table_derived->getSingleCompositeKey(
-                             exclusive_source_key) +
-                         1));
+  CheckMaxValue<int>(
+      exclusive_key_destination_max_val.getLiteral<int>(),
+      aggregation_handle_max_derived->finalizeHashTableEntry(
+          destination_hash_table_derived->getSingleCompositeKey(
+              exclusive_destination_key) + 1));
+  CheckMaxValue<int>(
+      exclusive_key_source_max_val.getLiteral<int>(),
+      aggregation_handle_max_derived->finalizeHashTableEntry(
+          source_hash_table_derived->getSingleCompositeKey(
+              exclusive_source_key) + 1));
 }
 
 }  // namespace quickstep