You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/17 21:56:03 UTC

[5/6] incubator-quickstep git commit: Updates

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index c2d571b..d40ae9f 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -38,100 +38,100 @@ namespace quickstep {
 
 class StorageManager;
 
-AggregationHandleMax::AggregationHandleMax(const Type &type)
-    : type_(type), block_update_(false) {
-  fast_comparator_.reset(
-      ComparisonFactory::GetComparison(ComparisonID::kGreater)
-          .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
-}
-
-AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleMax::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
-
-  return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
-      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMax::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for MAX: " << accessor_ids.size();
-
-  return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
-      type_.getNullableVersion().makeNullValue(),
-      accessor,
-      accessor_ids.front()));
-}
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for MAX: " << argument_ids.size();
-}
-
-void AggregationHandleMax::mergeStates(const AggregationState &source,
-                                       AggregationState *destination) const {
-  const AggregationStateMax &max_source =
-      static_cast<const AggregationStateMax &>(source);
-  AggregationStateMax *max_destination =
-      static_cast<AggregationStateMax *>(destination);
-
-  if (!max_source.max_.isNull()) {
-    compareAndUpdate(max_destination, max_source.max_);
-  }
-}
-
-void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
-  const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
-  TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
-  if (!(src_max_ptr->isNull())) {
-    compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
-  }
-}
-
-ColumnVector* AggregationHandleMax::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleMax,
-                                     AggregationStateFastHashTable>(
-      type_.getNullableVersion(), hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
-    const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleMax,
-      AggregationStateMax>(distinctify_hash_table);
-}
-
-void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
-    const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleMax,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
-}
+AggregationHandleMax::AggregationHandleMax(const Type &type) {}
+//    : type_(type), block_update_(false) {
+//  fast_comparator_.reset(
+//      ComparisonFactory::GetComparison(ComparisonID::kGreater)
+//          .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
+//}
+//
+//AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
+//    const HashTableImplType hash_table_impl,
+//    const std::vector<const Type *> &group_by_types,
+//    const std::size_t estimated_num_groups,
+//    StorageManager *storage_manager) const {
+//  return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable(
+//      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
+//}
+//
+//AggregationState* AggregationHandleMax::accumulateColumnVectors(
+//    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+//  DCHECK_EQ(1u, column_vectors.size())
+//      << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
+//
+//  return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
+//      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
+//}
+//
+//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//AggregationState* AggregationHandleMax::accumulateValueAccessor(
+//    ValueAccessor *accessor,
+//    const std::vector<attribute_id> &accessor_ids) const {
+//  DCHECK_EQ(1u, accessor_ids.size())
+//      << "Got wrong number of attributes for MAX: " << accessor_ids.size();
+//
+//  return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
+//      type_.getNullableVersion().makeNullValue(),
+//      accessor,
+//      accessor_ids.front()));
+//}
+//#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//
+//void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
+//    ValueAccessor *accessor,
+//    const std::vector<attribute_id> &argument_ids,
+//    const std::vector<attribute_id> &group_by_key_ids,
+//    AggregationStateHashTableBase *hash_table) const {
+//  DCHECK_EQ(1u, argument_ids.size())
+//      << "Got wrong number of arguments for MAX: " << argument_ids.size();
+//}
+//
+//void AggregationHandleMax::mergeStates(const AggregationState &source,
+//                                       AggregationState *destination) const {
+//  const AggregationStateMax &max_source =
+//      static_cast<const AggregationStateMax &>(source);
+//  AggregationStateMax *max_destination =
+//      static_cast<AggregationStateMax *>(destination);
+//
+//  if (!max_source.max_.isNull()) {
+//    compareAndUpdate(max_destination, max_source.max_);
+//  }
+//}
+//
+//void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source,
+//                                           std::uint8_t *destination) const {
+//  const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
+//  TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
+//  if (!(src_max_ptr->isNull())) {
+//    compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
+//  }
+//}
+//
+//ColumnVector* AggregationHandleMax::finalizeHashTable(
+//    const AggregationStateHashTableBase &hash_table,
+//    std::vector<std::vector<TypedValue>> *group_by_keys,
+//    int index) const {
+//  return finalizeHashTableHelperFast<AggregationHandleMax,
+//                                     AggregationStateFastHashTable>(
+//      type_.getNullableVersion(), hash_table, group_by_keys, index);
+//}
+//
+//AggregationState*
+//AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
+//    const AggregationStateHashTableBase &distinctify_hash_table) const {
+//  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+//      AggregationHandleMax,
+//      AggregationStateMax>(distinctify_hash_table);
+//}
+//
+//void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
+//    const AggregationStateHashTableBase &distinctify_hash_table,
+//    AggregationStateHashTableBase *aggregation_hash_table,
+//    std::size_t index) const {
+//  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+//      AggregationHandleMax,
+//      AggregationStateFastHashTable>(
+//      distinctify_hash_table, aggregation_hash_table, index);
+//}
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 5fb9f44..effc38f 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -26,9 +26,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -49,149 +47,12 @@ class ValueAccessor;
  */
 
 /**
- * @brief Aggregation state for max.
- */
-class AggregationStateMax : public AggregationState {
- public:
-  /**
-   * @brief Copy constructor (ignores mutex).
-   */
-  AggregationStateMax(const AggregationStateMax &orig) : max_(orig.max_) {}
-
-  /**
-   * @brief Destructor.
-   */
-  ~AggregationStateMax() override{};
-
-  const std::uint8_t* getPayloadAddress() const {
-    return reinterpret_cast<const uint8_t *>(&max_);
-  }
-
- private:
-  friend class AggregationHandleMax;
-
-  explicit AggregationStateMax(const Type &type)
-      : max_(type.getNullableVersion().makeNullValue()) {}
-
-  explicit AggregationStateMax(TypedValue &&value) : max_(std::move(value)) {}
-
-  TypedValue max_;
-  SpinMutex mutex_;
-};
-
-/**
  * @brief An aggregationhandle for max.
  **/
-class AggregationHandleMax : public AggregationConcreteHandle {
+class AggregationHandleMax : public AggregationHandle {
  public:
   ~AggregationHandleMax() override {}
 
-  AggregationState* createInitialState() const override {
-    return new AggregationStateMax(type_);
-  }
-
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  /**
-   * @brief Iterate with max aggregation state.
-   */
-  inline void iterateUnaryInl(AggregationStateMax *state,
-                              const TypedValue &value) const {
-    DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
-    compareAndUpdate(static_cast<AggregationStateMax *>(state), value);
-  }
-
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
-    DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
-    TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
-    compareAndUpdateFast(max_ptr, value);
-  }
-
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
-
-  void blockUpdate() override { block_update_ = true; }
-
-  void allowUpdate() override { block_update_ = false; }
-
-  void initPayload(std::uint8_t *byte_ptr) const override {
-    TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
-    TypedValue t1 = (type_.getNullableVersion().makeNullValue());
-    *max_ptr = t1;
-  }
-
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
-  TypedValue finalize(const AggregationState &state) const override {
-    return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
-  }
-
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
-  }
-
-  inline TypedValue finalizeHashTableEntryFast(
-      const std::uint8_t *byte_ptr) const {
-    const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
-    return TypedValue(*max_ptr);
-  }
-
-  ColumnVector* finalizeHashTable(
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for MAX aggregation.
-   */
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for MAX aggregation.
-   */
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
-
  private:
   friend class AggregateFunctionMax;
 
@@ -202,37 +63,8 @@ class AggregationHandleMax : public AggregationConcreteHandle {
    **/
   explicit AggregationHandleMax(const Type &type);
 
-  /**
-   * @brief compare the value with max_ and update it if the value is larger
-   *        than current maximum. NULLs are ignored.
-   *
-   * @param value A TypedValue to compare
-   **/
-  inline void compareAndUpdate(AggregationStateMax *state,
-                               const TypedValue &value) const {
-    // TODO(chasseur): Avoid null-checks when aggregating a non-nullable Type.
-    if (value.isNull()) return;
-
-    SpinMutexLock lock(state->mutex_);
-    if (state->max_.isNull() ||
-        fast_comparator_->compareTypedValues(value, state->max_)) {
-      state->max_ = value;
-    }
-  }
-
-  inline void compareAndUpdateFast(TypedValue *max_ptr,
-                                   const TypedValue &value) const {
-    if (value.isNull()) return;
-    if (max_ptr->isNull() ||
-        fast_comparator_->compareTypedValues(value, *max_ptr)) {
-      *max_ptr = value;
-    }
-  }
-
-  const Type &type_;
-  std::unique_ptr<UncheckedComparator> fast_comparator_;
-
-  bool block_update_;
+//  const Type &type_;
+//  std::unique_ptr<UncheckedComparator> fast_comparator_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index a07f299..4765c93 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -38,101 +38,101 @@ namespace quickstep {
 
 class StorageManager;
 
-AggregationHandleMin::AggregationHandleMin(const Type &type)
-    : type_(type), block_update_(false) {
-  fast_comparator_.reset(
-      ComparisonFactory::GetComparison(ComparisonID::kLess)
-          .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
-}
-
-AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleMin::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
-
-  return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
-      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMin::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for MIN: " << accessor_ids.size();
-
-  return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
-      type_.getNullableVersion().makeNullValue(),
-      accessor,
-      accessor_ids.front()));
-}
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for MIN: " << argument_ids.size();
-}
-
-void AggregationHandleMin::mergeStates(const AggregationState &source,
-                                       AggregationState *destination) const {
-  const AggregationStateMin &min_source =
-      static_cast<const AggregationStateMin &>(source);
-  AggregationStateMin *min_destination =
-      static_cast<AggregationStateMin *>(destination);
-
-  if (!min_source.min_.isNull()) {
-    compareAndUpdate(min_destination, min_source.min_);
-  }
-}
-
-void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
-  const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
-  TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
-
-  if (!(src_min_ptr->isNull())) {
-    compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
-  }
-}
-
-ColumnVector* AggregationHandleMin::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleMin,
-                                     AggregationStateFastHashTable>(
-      type_.getNonNullableVersion(), hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
-    const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleMin,
-      AggregationStateMin>(distinctify_hash_table);
-}
-
-void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
-    const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleMin,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
-}
+AggregationHandleMin::AggregationHandleMin(const Type &type) {}
+//    : type_(type), block_update_(false) {
+//  fast_comparator_.reset(
+//      ComparisonFactory::GetComparison(ComparisonID::kLess)
+//          .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
+//}
+//
+//AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
+//    const HashTableImplType hash_table_impl,
+//    const std::vector<const Type *> &group_by_types,
+//    const std::size_t estimated_num_groups,
+//    StorageManager *storage_manager) const {
+//  return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable(
+//      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
+//}
+//
+//AggregationState* AggregationHandleMin::accumulateColumnVectors(
+//    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+//  DCHECK_EQ(1u, column_vectors.size())
+//      << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
+//
+//  return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
+//      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
+//}
+//
+//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//AggregationState* AggregationHandleMin::accumulateValueAccessor(
+//    ValueAccessor *accessor,
+//    const std::vector<attribute_id> &accessor_ids) const {
+//  DCHECK_EQ(1u, accessor_ids.size())
+//      << "Got wrong number of attributes for MIN: " << accessor_ids.size();
+//
+//  return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
+//      type_.getNullableVersion().makeNullValue(),
+//      accessor,
+//      accessor_ids.front()));
+//}
+//#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//
+//void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
+//    ValueAccessor *accessor,
+//    const std::vector<attribute_id> &argument_ids,
+//    const std::vector<attribute_id> &group_by_key_ids,
+//    AggregationStateHashTableBase *hash_table) const {
+//  DCHECK_EQ(1u, argument_ids.size())
+//      << "Got wrong number of arguments for MIN: " << argument_ids.size();
+//}
+//
+//void AggregationHandleMin::mergeStates(const AggregationState &source,
+//                                       AggregationState *destination) const {
+//  const AggregationStateMin &min_source =
+//      static_cast<const AggregationStateMin &>(source);
+//  AggregationStateMin *min_destination =
+//      static_cast<AggregationStateMin *>(destination);
+//
+//  if (!min_source.min_.isNull()) {
+//    compareAndUpdate(min_destination, min_source.min_);
+//  }
+//}
+//
+//void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source,
+//                                           std::uint8_t *destination) const {
+//  const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
+//  TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
+//
+//  if (!(src_min_ptr->isNull())) {
+//    compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
+//  }
+//}
+//
+//ColumnVector* AggregationHandleMin::finalizeHashTable(
+//    const AggregationStateHashTableBase &hash_table,
+//    std::vector<std::vector<TypedValue>> *group_by_keys,
+//    int index) const {
+//  return finalizeHashTableHelperFast<AggregationHandleMin,
+//                                     AggregationStateFastHashTable>(
+//      type_.getNonNullableVersion(), hash_table, group_by_keys, index);
+//}
+//
+//AggregationState*
+//AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
+//    const AggregationStateHashTableBase &distinctify_hash_table) const {
+//  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+//      AggregationHandleMin,
+//      AggregationStateMin>(distinctify_hash_table);
+//}
+//
+//void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
+//    const AggregationStateHashTableBase &distinctify_hash_table,
+//    AggregationStateHashTableBase *aggregation_hash_table,
+//    std::size_t index) const {
+//  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+//      AggregationHandleMin,
+//      AggregationStateFastHashTable>(
+//      distinctify_hash_table, aggregation_hash_table, index);
+//}
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 173911d..64fddea 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -26,11 +26,8 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
-#include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/operations/comparisons/Comparison.hpp"
@@ -49,151 +46,12 @@ class ValueAccessor;
  */
 
 /**
- * @brief Aggregation state for min.
- */
-class AggregationStateMin : public AggregationState {
- public:
-  /**
-   * @brief Copy constructor (ignores mutex).
-   */
-  AggregationStateMin(const AggregationStateMin &orig) : min_(orig.min_) {}
-
-  /**
-   * @brief Destructor.
-   */
-  ~AggregationStateMin() override {}
-
-  std::size_t getPayloadSize() const { return sizeof(TypedValue); }
-
-  const std::uint8_t *getPayloadAddress() const {
-    return reinterpret_cast<const uint8_t *>(&min_);
-  }
-
- private:
-  friend class AggregationHandleMin;
-
-  explicit AggregationStateMin(const Type &type)
-      : min_(type.getNullableVersion().makeNullValue()) {}
-
-  explicit AggregationStateMin(TypedValue &&value) : min_(std::move(value)) {}
-
-  TypedValue min_;
-  SpinMutex mutex_;
-};
-
-/**
  * @brief An aggregationhandle for min.
  **/
-class AggregationHandleMin : public AggregationConcreteHandle {
+class AggregationHandleMin : public AggregationHandle {
  public:
   ~AggregationHandleMin() override {}
 
-  AggregationState* createInitialState() const override {
-    return new AggregationStateMin(type_);
-  }
-
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  /**
-   * @brief Iterate with min aggregation state.
-   */
-  inline void iterateUnaryInl(AggregationStateMin *state,
-                              const TypedValue &value) const {
-    DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
-    compareAndUpdate(state, value);
-  }
-
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
-    DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
-    TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
-    compareAndUpdateFast(min_ptr, value);
-  }
-
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
-
-  void blockUpdate() override { block_update_ = true; }
-
-  void allowUpdate() override { block_update_ = false; }
-
-  void initPayload(std::uint8_t *byte_ptr) const override {
-    TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
-    TypedValue t1 = (type_.getNullableVersion().makeNullValue());
-    *min_ptr = t1;
-  }
-
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
-  TypedValue finalize(const AggregationState &state) const override {
-    return static_cast<const AggregationStateMin &>(state).min_;
-  }
-
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return static_cast<const AggregationStateMin &>(state).min_;
-  }
-
-  inline TypedValue finalizeHashTableEntryFast(
-      const std::uint8_t *byte_ptr) const {
-    const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
-    return TypedValue(*min_ptr);
-  }
-
-  ColumnVector* finalizeHashTable(
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
-
-  /**
-   * @brief Implementation of
-   * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for MIN aggregation.
-   */
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for MIN aggregation.
-   */
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
-
  private:
   friend class AggregateFunctionMin;
 
@@ -204,36 +62,8 @@ class AggregationHandleMin : public AggregationConcreteHandle {
    **/
   explicit AggregationHandleMin(const Type &type);
 
-  /**
-   * @brief compare the value with min_ and update it if the value is smaller
-   *        than current minimum. NULLs are ignored.
-   *
-   * @param value A TypedValue to compare.
-   **/
-  inline void compareAndUpdate(AggregationStateMin *state,
-                               const TypedValue &value) const {
-    if (value.isNull()) return;
-
-    SpinMutexLock lock(state->mutex_);
-    if (state->min_.isNull() ||
-        fast_comparator_->compareTypedValues(value, state->min_)) {
-      state->min_ = value;
-    }
-  }
-
-  inline void compareAndUpdateFast(TypedValue *min_ptr,
-                                   const TypedValue &value) const {
-    if (value.isNull()) return;
-    if (min_ptr->isNull() ||
-        fast_comparator_->compareTypedValues(value, *min_ptr)) {
-      *min_ptr = value;
-    }
-  }
-
-  const Type &type_;
-  std::unique_ptr<UncheckedComparator> fast_comparator_;
-
-  bool block_update_;
+//  const Type &type_;
+//  std::unique_ptr<UncheckedComparator> fast_comparator_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 642d88d..4e77ed0 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -20,6 +20,7 @@
 #include "expressions/aggregation/AggregationHandleSum.hpp"
 
 #include <cstddef>
+#include <cstring>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -35,6 +36,7 @@
 #include "types/operations/binary_operations/BinaryOperation.hpp"
 #include "types/operations/binary_operations/BinaryOperationFactory.hpp"
 #include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/TypeFunctors.hpp"
 
 #include "glog/logging.h"
 
@@ -42,12 +44,11 @@ namespace quickstep {
 
 class StorageManager;
 
-AggregationHandleSum::AggregationHandleSum(const Type &type)
-    : argument_type_(type), block_update_(false) {
+AggregationHandleSum::AggregationHandleSum(const Type &argument_type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;
-  switch (argument_type_.getTypeID()) {
+  switch (argument_type.getTypeID()) {
     case kInt:
     case kLong:
       type_precision_id = kLong;
@@ -57,134 +58,57 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
       type_precision_id = kDouble;
       break;
     default:
-      type_precision_id = type.getTypeID();
+      type_precision_id = argument_type.getTypeID();
       break;
   }
 
   const Type &sum_type = TypeFactory::GetType(type_precision_id);
-  blank_state_.sum_ = sum_type.makeZeroValue();
+  state_size_ = sum_type.maximumByteLength();
+  blank_state_.reset(state_size_, false);
+
+  tv_blank_state_ = sum_type.makeZeroValue();
 
   // Make operators to do arithmetic:
   // Add operator for summing argument values.
-  fast_operator_.reset(
+  accumulate_operator_.reset(
       BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
-          .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
+          .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type));
+  accumulate_functor_ = accumulate_operator_->getMergeFunctor();
+
   // Add operator for merging states.
   merge_operator_.reset(
       BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
           .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
+  merge_functor_ = merge_operator_->getMergeFunctor();
 
-  // Result is nullable, because SUM() over 0 values (or all NULL values) is
-  // NULL.
-  result_type_ = &sum_type.getNullableVersion();
+  finalize_functor_ = MakeUntypedCopyFunctor(&sum_type);
+  result_type_ = &sum_type;
 }
 
-AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleSum::accumulateColumnVectors(
+void AggregationHandleSum::accumulateColumnVectors(
+    void *state,
     const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
   DCHECK_EQ(1u, column_vectors.size())
       << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
   std::size_t num_tuples = 0;
-  TypedValue cv_sum = fast_operator_->accumulateColumnVector(
-      blank_state_.sum_, *column_vectors.front(), &num_tuples);
-  return new AggregationStateSum(std::move(cv_sum), num_tuples == 0);
+  TypedValue cv_sum = accumulate_operator_->accumulateColumnVector(
+      tv_blank_state_, *column_vectors.front(), &num_tuples);
+  cv_sum.copyInto(state);
 }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleSum::accumulateValueAccessor(
+void AggregationHandleSum::accumulateValueAccessor(
+    void *state,
     ValueAccessor *accessor,
     const std::vector<attribute_id> &accessor_ids) const {
   DCHECK_EQ(1u, accessor_ids.size())
       << "Got wrong number of attributes for SUM: " << accessor_ids.size();
 
   std::size_t num_tuples = 0;
-  TypedValue va_sum = fast_operator_->accumulateValueAccessor(
-      blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples);
-  return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
+  TypedValue va_sum = accumulate_operator_->accumulateValueAccessor(
+      tv_blank_state_, accessor, accessor_ids.front(), &num_tuples);
+  va_sum.copyInto(state);
 }
 #endif
 
-void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for SUM: " << argument_ids.size();
-}
-
-void AggregationHandleSum::mergeStates(const AggregationState &source,
-                                       AggregationState *destination) const {
-  const AggregationStateSum &sum_source =
-      static_cast<const AggregationStateSum &>(source);
-  AggregationStateSum *sum_destination =
-      static_cast<AggregationStateSum *>(destination);
-
-  SpinMutexLock lock(sum_destination->mutex_);
-  sum_destination->sum_ = merge_operator_->applyToTypedValues(
-      sum_destination->sum_, sum_source.sum_);
-  sum_destination->null_ = sum_destination->null_ && sum_source.null_;
-}
-
-void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
-  const TypedValue *src_sum_ptr =
-      reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
-  const bool *src_null_ptr =
-      reinterpret_cast<const bool *>(source + blank_state_.null_offset_);
-  TypedValue *dst_sum_ptr =
-      reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
-  bool *dst_null_ptr =
-      reinterpret_cast<bool *>(destination + blank_state_.null_offset_);
-  *dst_sum_ptr =
-      merge_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
-  *dst_null_ptr = (*dst_null_ptr) && (*src_null_ptr);
-}
-
-TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
-  const AggregationStateSum &agg_state =
-      static_cast<const AggregationStateSum &>(state);
-  if (agg_state.null_) {
-    // SUM() over no values is NULL.
-    return result_type_->makeNullValue();
-  } else {
-    return agg_state.sum_;
-  }
-}
-
-ColumnVector* AggregationHandleSum::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleSum,
-                                     AggregationStateFastHashTable>(
-      *result_type_, hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
-    const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleSum,
-      AggregationStateSum>(distinctify_hash_table);
-}
-
-void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
-    const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleSum,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
-}
-
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index 6c334a6..f45e87e 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -26,198 +26,39 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
-#include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "utility/ScopedBuffer.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
 
 namespace quickstep {
 
-class ColumnVector;
-class StorageManager;
-class ValueAccessor;
-
 /** \addtogroup Expressions
  *  @{
  */
 
 /**
- * @brief Aggregation state for sum.
- */
-class AggregationStateSum : public AggregationState {
- public:
-  /**
-   * @brief Copy constructor (ignores mutex).
-   */
-  AggregationStateSum(const AggregationStateSum &orig)
-      : sum_(orig.sum_),
-        null_(orig.null_),
-        sum_offset_(orig.sum_offset_),
-        null_offset_(orig.null_offset_) {}
-
-  std::size_t getPayloadSize() const {
-    std::size_t p1 = reinterpret_cast<std::size_t>(&sum_);
-    std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_);
-    return (p2 - p1);
-  }
-
-  const std::uint8_t* getPayloadAddress() const {
-    return reinterpret_cast<const uint8_t *>(&sum_);
-  }
-
- private:
-  friend class AggregationHandleSum;
-
-  AggregationStateSum()
-      : sum_(0),
-        null_(true),
-        sum_offset_(0),
-        null_offset_(reinterpret_cast<std::uint8_t *>(&null_) -
-                     reinterpret_cast<std::uint8_t *>(&sum_)) {}
-
-  AggregationStateSum(TypedValue &&sum, const bool is_null)
-      : sum_(std::move(sum)), null_(is_null) {}
-
-  // TODO(shoban): We might want to specialize sum_ to use atomics for int types
-  // similar to in AggregationStateCount.
-  TypedValue sum_;
-  bool null_;
-  SpinMutex mutex_;
-
-  int sum_offset_, null_offset_;
-};
-
-/**
  * @brief An aggregationhandle for sum.
  **/
-class AggregationHandleSum : public AggregationConcreteHandle {
+class AggregationHandleSum : public AggregationHandle {
  public:
   ~AggregationHandleSum() override {}
 
-  AggregationState* createInitialState() const override {
-    return new AggregationStateSum(blank_state_);
-  }
-
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  inline void iterateUnaryInl(AggregationStateSum *state,
-                              const TypedValue &value) const {
-    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
-    if (value.isNull()) return;
-
-    SpinMutexLock lock(state->mutex_);
-    state->sum_ = fast_operator_->applyToTypedValues(state->sum_, value);
-    state->null_ = false;
-  }
-
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
-    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
-    if (value.isNull()) return;
-    TypedValue *sum_ptr =
-        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
-    bool *null_ptr =
-        reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
-    *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
-    *null_ptr = false;
-  }
-
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
-
-  void blockUpdate() override { block_update_ = true; }
-
-  void allowUpdate() override { block_update_ = false; }
-
-  void initPayload(std::uint8_t *byte_ptr) const override {
-    TypedValue *sum_ptr =
-        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
-    bool *null_ptr =
-        reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
-    *sum_ptr = blank_state_.sum_;
-    *null_ptr = true;
-  }
-
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
+  void accumulateColumnVectors(
+      void *state,
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
+  void accumulateValueAccessor(
+      void *state,
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
+      const std::vector<attribute_id> &accessor_ids) const override;
 #endif
 
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
-  TypedValue finalize(const AggregationState &state) const override;
-
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return static_cast<const AggregationStateSum &>(state).sum_;
-  }
-
-  inline TypedValue finalizeHashTableEntryFast(
-      const std::uint8_t *byte_ptr) const {
-    std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
-    TypedValue *sum_ptr =
-        reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
-    return *sum_ptr;
-  }
-
-  ColumnVector* finalizeHashTable(
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for SUM aggregation.
-   */
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for SUM aggregation.
-   */
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override {
-    return blank_state_.getPayloadSize();
-  }
-
  private:
   friend class AggregateFunctionSum;
 
@@ -226,15 +67,13 @@ class AggregationHandleSum : public AggregationConcreteHandle {
    *
    * @param type Type of the sum value.
    **/
-  explicit AggregationHandleSum(const Type &type);
+  explicit AggregationHandleSum(const Type &argument_type);
 
-  const Type &argument_type_;
-  const Type *result_type_;
-  AggregationStateSum blank_state_;
-  std::unique_ptr<UncheckedBinaryOperator> fast_operator_;
-  std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
+  // TODO: temporary
+  TypedValue tv_blank_state_;
 
-  bool block_update_;
+  std::unique_ptr<UncheckedBinaryOperator> accumulate_operator_;
+  std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index e9503f7..7b369ae 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -43,9 +43,6 @@ add_library(quickstep_expressions_aggregation_AggregateFunctionMin
 add_library(quickstep_expressions_aggregation_AggregateFunctionSum
             AggregateFunctionSum.cpp
             AggregateFunctionSum.hpp)
-add_library(quickstep_expressions_aggregation_AggregationConcreteHandle
-            AggregationConcreteHandle.cpp
-            AggregationConcreteHandle.hpp)
 add_library(quickstep_expressions_aggregation_AggregationHandle
             ../../empty_src.cpp
             AggregationHandle.hpp)
@@ -55,9 +52,6 @@ add_library(quickstep_expressions_aggregation_AggregationHandleAvg
 add_library(quickstep_expressions_aggregation_AggregationHandleCount
             AggregationHandleCount.cpp
             AggregationHandleCount.hpp)
-add_library(quickstep_expressions_aggregation_AggregationHandleDistinct
-            AggregationHandleDistinct.cpp
-            AggregationHandleDistinct.hpp)
 add_library(quickstep_expressions_aggregation_AggregationHandleMax
             AggregationHandleMax.cpp
             AggregationHandleMax.hpp)
@@ -142,34 +136,20 @@ target_link_libraries(quickstep_expressions_aggregation_AggregateFunctionSum
                       quickstep_types_operations_binaryoperations_BinaryOperationFactory
                       quickstep_types_operations_binaryoperations_BinaryOperationID
                       quickstep_utility_Macros)
-target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandle
-                      glog
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
-                      quickstep_threading_SpinMutex
-                      quickstep_types_TypedValue
-                      quickstep_types_containers_ColumnVector
-                      quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandle
                       glog
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_storage_HashTableBase
+                      quickstep_types_Type
                       quickstep_types_TypedValue
-                      quickstep_utility_Macros)
+                      quickstep_utility_Macros
+                      quickstep_utility_ScopedBuffer)
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
                       glog
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
-                      quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeFactory
                       quickstep_types_TypeID
@@ -181,34 +161,23 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
                       glog
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
+                      quickstep_types_LongType
                       quickstep_types_TypeFactory
                       quickstep_types_TypeID
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorUtil
                       quickstep_utility_Macros)
-target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinct
-                      glog
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationConcreteHandle
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_types_TypedValue
-                      quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
                       glog
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -223,9 +192,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
                       glog
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -240,21 +207,21 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
                       glog
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeFactory
+                      quickstep_types_TypeFunctors
                       quickstep_types_TypeID
                       quickstep_types_TypedValue
                       quickstep_types_operations_binaryoperations_BinaryOperation
                       quickstep_types_operations_binaryoperations_BinaryOperationFactory
                       quickstep_types_operations_binaryoperations_BinaryOperationID
-                      quickstep_utility_Macros)
+                      quickstep_utility_Macros
+                      quickstep_utility_ScopedBuffer)
 
 # Submodule all-in-one library:
 add_library(quickstep_expressions_aggregation ../../empty_src.cpp)
@@ -267,11 +234,9 @@ target_link_libraries(quickstep_expressions_aggregation
                       quickstep_expressions_aggregation_AggregateFunctionMax
                       quickstep_expressions_aggregation_AggregateFunctionMin
                       quickstep_expressions_aggregation_AggregateFunctionSum
-                      quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationHandleAvg
                       quickstep_expressions_aggregation_AggregationHandleCount
-                      quickstep_expressions_aggregation_AggregationHandleDistinct
                       quickstep_expressions_aggregation_AggregationHandleMax
                       quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_expressions_aggregation_AggregationHandleSum

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationHashTable.hpp b/storage/AggregationHashTable.hpp
deleted file mode 100644
index fca6d4c..0000000
--- a/storage/AggregationHashTable.hpp
+++ /dev/null
@@ -1,330 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_STORAGE_AGGREGATION_HASH_TABLE_HPP_
-#define QUICKSTEP_STORAGE_AGGREGATION_HASH_TABLE_HPP_
-
-#include <algorithm>
-#include <atomic>
-#include <cstddef>
-#include <cstdlib>
-#include <cstring>
-#include <limits>
-#include <memory>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/HashTableBase.hpp"
-#include "storage/HashTableUntypedKeyManager.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
-#include "storage/StorageManager.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorUtil.hpp"
-#include "threading/SpinMutex.hpp"
-#include "threading/SpinSharedMutex.hpp"
-#include "types/Type.hpp"
-#include "types/TypeFunctors.hpp"
-#include "utility/Alignment.hpp"
-#include "utility/InlineMemcpy.hpp"
-#include "utility/Macros.hpp"
-#include "utility/PrimeNumber.hpp"
-
-namespace quickstep {
-
-/** \addtogroup Storage
- *  @{
- */
-
-template <bool use_mutex>
-class AggregationHashTablePayloadManager {
- public:
-  AggregationHashTablePayloadManager(const std::vector<AggregationHandle *> &handles)
-      : handles_(handles),
-        payload_size_in_bytes_(0) {
-    if (use_mutex) {
-      payload_size_in_bytes_ += sizeof(SpinMutex);
-    }
-    for (const AggregationHandle *handle : handles) {
-      const std::size_t state_size = handle->getStateSize();
-      agg_state_sizes_.emplace_back(state_size);
-      agg_state_offsets_.emplace_back(payload_size_in_bytes_);
-      payload_size_in_bytes_ += state_size;
-    }
-
-    initial_payload_ = std::malloc(payload_size_in_bytes_);
-    if (use_mutex) {
-      new(initial_payload_) Mutex;
-    }
-//    for (std::size_t i = 0; i < handles_.size(); ++i) {
-//      handles_[i]->initPayload(
-//          static_cast<std::uint8_t *>(initial_payload_) + agg_state_offsets_[i]);
-//    }
-  }
-
-  ~AggregationHashTablePayloadManager() {
-    std::free(initial_payload_);
-  }
-
-  inline std::size_t getPayloadSizeInBytes() const {
-    return payload_size_in_bytes_;
-  }
-
-  inline void updatePayload(void *payload) const {
-  }
-
-  inline void initPayload(void *payload) const {
-  }
-
- private:
-  std::vector<AggregationHandle *> handles_;
-
-  std::vector<std::size_t> agg_state_sizes_;
-  std::vector<std::size_t> agg_state_offsets_;
-  std::size_t payload_size_in_bytes_;
-
-  void *initial_payload_;
-
-  DISALLOW_COPY_AND_ASSIGN(AggregationHashTablePayloadManager);
-};
-
-class ThreadPrivateAggregationHashTable : public AggregationHashTableBase {
- public:
-  ThreadPrivateAggregationHashTable(const std::vector<const Type *> &key_types,
-                                    const std::size_t num_entries,
-                                    const std::vector<AggregationHandle *> &handles,
-                                    StorageManager *storage_manager)
-    : payload_manager_(handles),
-      key_types_(key_types),
-      key_manager_(this->key_types_, payload_manager_.getPayloadSizeInBytes()),
-      slots_(num_entries * kHashTableLoadFactor,
-             key_manager_.getUntypedKeyHashFunctor(),
-             key_manager_.getUntypedKeyEqualityFunctor()),
-      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize(),
-                                     payload_manager_.getPayloadSizeInBytes())),
-      buckets_allocated_(0),
-      storage_manager_(storage_manager) {
-    std::size_t num_storage_slots =
-        this->storage_manager_->SlotsNeededForBytes(num_entries);
-
-    // Get a StorageBlob to hold the hash table.
-    const block_id blob_id = this->storage_manager_->createBlob(num_storage_slots);
-    this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
-
-    buckets_ = this->blob_->getMemoryMutable();
-    num_buckets_ = num_storage_slots * kSlotSizeBytes / bucket_size_;
-  }
-
-  void resize() {
-    const std::size_t resized_memory_required = num_buckets_ * bucket_size_ * 2;
-    const std::size_t resized_storage_slots =
-        this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
-    const block_id resized_blob_id =
-        this->storage_manager_->createBlob(resized_storage_slots);
-    MutableBlobReference resized_blob =
-        this->storage_manager_->getBlobMutable(resized_blob_id);
-
-    void *resized_buckets = resized_blob->getMemoryMutable();
-    std::memcpy(resized_buckets, buckets_, buckets_allocated_ * bucket_size_);
-
-    for (auto &pair : slots_) {
-      pair.second =
-           (static_cast<const char *>(pair.first) - static_cast<char *>(buckets_))
-           + static_cast<char *>(resized_buckets);
-    }
-
-    buckets_ = resized_buckets;
-    num_buckets_ = resized_storage_slots * kSlotSizeBytes / bucket_size_;
-    std::swap(this->blob_, resized_blob);
-  }
-
-  bool upsertValueAccessor(ValueAccessor *accessor,
-                           const attribute_id key_attr_id,
-                           const std::vector<attribute_id> &argument_ids,
-                           const bool check_for_null_keys) override {
-    if (check_for_null_keys) {
-      return upsertValueAccessorInternal<true>(
-          accessor, key_attr_id, argument_ids);
-    } else {
-      return upsertValueAccessorInternal<false>(
-          accessor, key_attr_id, argument_ids);
-    }
-  }
-
-  template <bool check_for_null_keys>
-  bool upsertValueAccessorInternal(ValueAccessor *accessor,
-                                   const attribute_id key_attr_id,
-                                   const std::vector<attribute_id> &argument_ids) {
-    return InvokeOnAnyValueAccessor(
-        accessor,
-        [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
-      accessor->beginIteration();
-      while (accessor->next()) {
-        const void *key = accessor->template getUntypedValue<check_for_null_keys>(key_attr_id);
-        if (check_for_null_keys && key == nullptr) {
-          continue;
-        }
-        bool is_empty;
-        void *bucket = locateBucket(key, &is_empty);
-        if (is_empty) {
-          payload_manager_.initPayload(bucket);
-        } else {
-          payload_manager_.updatePayload(bucket);
-        }
-      }
-      return true;
-    });
-  }
-
-  bool upsertValueAccessorCompositeKey(ValueAccessor *accessor,
-                                       const std::vector<attribute_id> &key_attr_ids,
-                                       const std::vector<attribute_id> &argument_ids,
-                                       const bool check_for_null_keys) override {
-    if (check_for_null_keys) {
-      return upsertValueAccessorCompositeKeyInternal<true>(
-          accessor, key_attr_ids, argument_ids);
-    } else {
-      return upsertValueAccessorCompositeKeyInternal<false>(
-          accessor, key_attr_ids, argument_ids);
-    }
-  }
-
-  template <bool check_for_null_keys>
-  bool upsertValueAccessorCompositeKeyInternal(ValueAccessor *accessor,
-                                               const std::vector<attribute_id> &key_attr_ids,
-                                               const std::vector<attribute_id> &argument_ids) {
-    return InvokeOnAnyValueAccessor(
-        accessor,
-        [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
-      accessor->beginIteration();
-      void *prealloc_bucket = allocateBucket();
-      while (accessor->next()) {
-        if (check_for_null_keys) {
-          const bool is_null =
-              key_manager_.writeNullableUntypedKeyFromValueAccessorToBucket(
-                  accessor,
-                  key_attr_ids,
-                  prealloc_bucket);
-          if (is_null) {
-            continue;
-          }
-        } else {
-          key_manager_.writeUntypedKeyFromValueAccessorToBucket(
-              accessor,
-              key_attr_ids,
-              prealloc_bucket);
-        }
-        void *bucket = locateBucketWithPrealloc(prealloc_bucket);
-        if (bucket != prealloc_bucket) {
-          payload_manager_.initPayload(bucket);
-          prealloc_bucket = allocateBucket();
-        } else {
-          payload_manager_.updatePayload(bucket);
-        }
-      }
-      // Reclaim the last unused bucket
-      --buckets_allocated_;
-      return true;
-    });
-  }
-
-  inline void* locateBucket(const void *key, bool *is_empty) {
-    auto slot_it = slots_.find(key);
-    if (slot_it == slots_.end()) {
-      void *bucket = allocateBucket();
-      key_manager_.writeUntypedKeyToBucket(key, bucket);
-      slots_.emplace(key_manager_.getUntypedKeyComponent(bucket), bucket);
-      *is_empty = true;
-      return bucket;
-    } else {
-      *is_empty = false;
-      return slot_it->second;
-    }
-  }
-
-  inline void* locateBucketWithPrealloc(void *prealloc_bucket) {
-    const void *key = key_manager_.getUntypedKeyComponent(prealloc_bucket);
-    auto slot_it = slots_.find(key);
-    if (slot_it == slots_.end()) {
-      slots_.emplace(key, prealloc_bucket);
-      return prealloc_bucket;
-    } else {
-      return slot_it->second;
-    }
-  }
-
-  inline void* allocateBucket() {
-    if (buckets_allocated_ >= num_buckets_) {
-      resize();
-    }
-    void *bucket = static_cast<char *>(buckets_) + buckets_allocated_ * bucket_size_;
-    ++buckets_allocated_;
-    return bucket;
-  }
-
-  void print() const override {
-    std::cerr << "Bucket size = " << bucket_size_ << "\n";
-    std::cerr << "Buckets: \n";
-    for (const auto &pair : slots_) {
-      std::cerr << pair.first << " -- " << pair.second << "\n";
-      std::cerr << *static_cast<const int *>(pair.second) << "\n";
-    }
-  }
-
- private:
-  // Helper object to manage hash table payloads (i.e. aggregation states).
-  AggregationHashTablePayloadManager<false> payload_manager_;
-
-  // Type(s) of keys.
-  const std::vector<const Type*> key_types_;
-
-  // Helper object to manage key storage.
-  HashTableUntypedKeyManager key_manager_;
-
-  // Round bucket size up to a multiple of kBucketAlignment.
-  static std::size_t ComputeBucketSize(const std::size_t fixed_key_size,
-                                       const std::size_t total_payload_size) {
-    constexpr std::size_t kBucketAlignment = 4;
-    return (((fixed_key_size + total_payload_size - 1)
-               / kBucketAlignment) + 1) * kBucketAlignment;
-  }
-
-  std::unordered_map<const void *, void *,
-                     UntypedKeyHashFunctor,
-                     UntypedKeyEqualityFunctor> slots_;
-
-  void *buckets_;
-  const std::size_t bucket_size_;
-  std::size_t num_buckets_;
-  std::size_t buckets_allocated_;
-
-  StorageManager *storage_manager_;
-  MutableBlobReference blob_;
-
-  DISALLOW_COPY_AND_ASSIGN(ThreadPrivateAggregationHashTable);
-};
-
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_STORAGE_AGGREGATION_HASH_TABLE_HPP_
-

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index fe16fc4..50e7c06 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -34,13 +34,11 @@
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunctionFactory.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
-#include "storage/AggregationHashTable.hpp"
 #include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.hpp"
+#include "storage/AggregationStateHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTableFactory.hpp"
 #include "storage/InsertDestination.hpp"
@@ -88,122 +86,66 @@ AggregationOperationState::AggregationOperationState(
   std::vector<AggregationHandle *> group_by_handles;
   group_by_handles.clear();
 
-  if (aggregate_functions.size() == 0) {
-    // If there is no aggregation function, then it is a distinctify operation
-    // on the group-by expressions.
-    DCHECK_GT(group_by_list_.size(), 0u);
-
-    handles_.emplace_back(new AggregationHandleDistinct());
-    arguments_.push_back({});
-    is_distinct_.emplace_back(false);
-    group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                     hash_table_impl_type,
-                                                     group_by_types,
-                                                     {1},
-                                                     handles_,
-                                                     storage_manager));
-  } else {
-    // Set up each individual aggregate in this operation.
-    std::vector<const AggregateFunction *>::const_iterator agg_func_it =
-        aggregate_functions.begin();
-    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
-        args_it = arguments_.begin();
-    std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
-    std::vector<HashTableImplType>::const_iterator
-        distinctify_hash_table_impl_types_it =
-            distinctify_hash_table_impl_types.begin();
-    std::vector<std::size_t> payload_sizes;
-    for (; agg_func_it != aggregate_functions.end();
-         ++agg_func_it, ++args_it, ++is_distinct_it) {
-      // Get the Types of this aggregate's arguments so that we can create an
-      // AggregationHandle.
-      std::vector<const Type *> argument_types;
-      for (const std::unique_ptr<const Scalar> &argument : *args_it) {
-        argument_types.emplace_back(&argument->getType());
-      }
-
-      // Sanity checks: aggregate function exists and can apply to the specified
-      // arguments.
-      DCHECK(*agg_func_it != nullptr);
-      DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
-
-      // Have the AggregateFunction create an AggregationHandle that we can use
-      // to do actual aggregate computation.
-      handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
+  // Set up each individual aggregate in this operation.
+  for (std::size_t i = 0; i < aggregate_functions.size(); ++i) {
+    // Get the Types of this aggregate's arguments so that we can create an
+    // AggregationHandle.
+    std::vector<const Type *> argument_types;
+    for (const std::unique_ptr<const Scalar> &argument : arguments[i]) {
+      argument_types.emplace_back(&argument->getType());
+    }
 
-      if (!group_by_list_.empty()) {
-        // Aggregation with GROUP BY: combined payload is partially updated in
-        // the presence of DISTINCT.
-        if (*is_distinct_it) {
-          handles_.back()->blockUpdate();
-        }
-        group_by_handles.emplace_back(handles_.back());
-        payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
-      } else {
-        // Aggregation without GROUP BY: create a single global state.
-        single_states_.emplace_back(handles_.back()->createInitialState());
+    // Sanity checks: aggregate function exists and can apply to the specified
+    // arguments.
+    const AggregateFunction *agg_func = aggregate_functions[i];
+    DCHECK(agg_func != nullptr);
+    DCHECK(agg_func->canApplyToTypes(argument_types));
+
+    // Have the AggregateFunction create an AggregationHandle that we can use
+    // to do actual aggregate computation.
+    handles_.emplace_back(agg_func->createHandle(argument_types));
+
+    if (!group_by_list_.empty()) {
+      // TODO(jianqiao): handle DISTINCT aggregation.
+      // if (is_distinct[i]) {
+      // }
+      group_by_handles.emplace_back(handles_.back());
+    } else {
+      // Aggregation without GROUP BY: create a single global state.
+      single_states_.emplace_back(handles_.back()->createInitialState());
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-        // See if all of this aggregate's arguments are attributes in the input
-        // relation. If so, remember the attribute IDs so that we can do copy
-        // elision when actually performing the aggregation.
-        std::vector<attribute_id> local_arguments_as_attributes;
-        local_arguments_as_attributes.reserve(args_it->size());
-        for (const std::unique_ptr<const Scalar> &argument : *args_it) {
-          const attribute_id argument_id =
-              argument->getAttributeIdForValueAccessor();
-          if (argument_id == -1) {
-            local_arguments_as_attributes.clear();
-            break;
-          } else {
-            DCHECK_EQ(input_relation_.getID(),
-                      argument->getRelationIdForValueAccessor());
-            local_arguments_as_attributes.push_back(argument_id);
-          }
+      // See if all of this aggregate's arguments are attributes in the input
+      // relation. If so, remember the attribute IDs so that we can do copy
+      // elision when actually performing the aggregation.
+      std::vector<attribute_id> local_arguments_as_attributes;
+      local_arguments_as_attributes.reserve(arguments[i].size());
+      for (const std::unique_ptr<const Scalar> &argument : arguments[i]) {
+        const attribute_id argument_id =
+            argument->getAttributeIdForValueAccessor();
+        if (argument_id == -1) {
+          local_arguments_as_attributes.clear();
+          break;
+        } else {
+          DCHECK_EQ(input_relation_.getID(),
+                    argument->getRelationIdForValueAccessor());
+          local_arguments_as_attributes.push_back(argument_id);
         }
-
-        arguments_as_attributes_.emplace_back(
-            std::move(local_arguments_as_attributes));
-#endif
       }
 
-      // Initialize the corresponding distinctify hash table if this is a
-      // DISTINCT
-      // aggregation.
-      if (*is_distinct_it) {
-        std::vector<const Type *> key_types(group_by_types);
-        key_types.insert(
-            key_types.end(), argument_types.begin(), argument_types.end());
-        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
-        // estimating
-        // the number of entries in the distinctify hash table. We may estimate
-        // for each distinct aggregation an estimated_num_distinct_keys value
-        // during
-        // query optimization, if it worths.
-        distinctify_hashtables_.emplace_back(
-            AggregationStateFastHashTableFactory::CreateResizable(
-                *distinctify_hash_table_impl_types_it,
-                key_types,
-                estimated_num_entries,
-                {0},
-                {},
-                storage_manager));
-        ++distinctify_hash_table_impl_types_it;
-      } else {
-        distinctify_hashtables_.emplace_back(nullptr);
-      }
+      arguments_as_attributes_.emplace_back(
+          std::move(local_arguments_as_attributes));
+#endif
     }
+  }
 
-    if (!group_by_handles.empty()) {
-      // Aggregation with GROUP BY: create a HashTable pool for per-group
-      // states.
-      group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                       hash_table_impl_type,
-                                                       group_by_types,
-                                                       payload_sizes,
-                                                       group_by_handles,
-                                                       storage_manager));
-    }
+  if (!group_by_handles.empty()) {
+    // Aggregation with GROUP BY: create a HashTable pool for per-group states.
+    group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                     hash_table_impl_type,
+                                                     group_by_types,
+                                                     group_by_handles,
+                                                     storage_manager));
   }
 }
 
@@ -352,12 +294,12 @@ void AggregationOperationState::finalizeAggregate(
 }
 
 void AggregationOperationState::mergeSingleState(
-    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
+    const std::vector<ScopedBuffer> &local_state) {
   DEBUG_ASSERT(local_state.size() == single_states_.size());
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (!is_distinct_[agg_idx]) {
-      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
-                                     single_states_[agg_idx].get());
+      handles_[agg_idx]->mergeStates(single_states_[agg_idx].get(),
+                                     local_state[agg_idx].get());
     }
   }
 }
@@ -365,7 +307,7 @@ void AggregationOperationState::mergeSingleState(
 void AggregationOperationState::aggregateBlockSingleState(
     const block_id input_block) {
   // Aggregate per-block state for each aggregate.
-  std::vector<std::unique_ptr<AggregationState>> local_state;
+  std::vector<ScopedBuffer> local_state;
 
   BlockReference block(
       storage_manager_->getBlock(input_block, input_relation_));
@@ -386,14 +328,7 @@ void AggregationOperationState::aggregateBlockSingleState(
       // Call StorageBlock::aggregateDistinct() to put the arguments as keys
       // directly into the (threadsafe) shared global distinctify HashTable
       // for this aggregate.
-      block->aggregateDistinct(*handles_[agg_idx],
-                               arguments_[agg_idx],
-                               local_arguments_as_attributes,
-                               {}, /* group_by */
-                               predicate_.get(),
-                               distinctify_hashtables_[agg_idx].get(),
-                               &reuse_matches,
-                               nullptr /* reuse_group_by_vectors */);
+      // TODO(jianqiao): handle DISTINCT aggregation.
       local_state.emplace_back(nullptr);
     } else {
       // Call StorageBlock::aggregate() to actually do the aggregation.
@@ -426,18 +361,10 @@ void AggregationOperationState::aggregateBlockHashTable(
 
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
-      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
-      // expression
+      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression
       // values and the aggregation arguments together as keys directly into the
       // (threadsafe) shared global distinctify HashTable for this aggregate.
-      block->aggregateDistinct(*handles_[agg_idx],
-                               arguments_[agg_idx],
-                               nullptr, /* arguments_as_attributes */
-                               group_by_list_,
-                               predicate_.get(),
-                               distinctify_hashtables_[agg_idx].get(),
-                               &reuse_matches,
-                               &reuse_group_by_vectors);
+      // TODO(jianqiao): handle DISTINCT aggregation.
     }
   }
 
@@ -445,16 +372,13 @@ void AggregationOperationState::aggregateBlockHashTable(
   // directly into the (threadsafe) shared global HashTable for this
   // aggregate.
   DCHECK(group_by_hashtable_pool_ != nullptr);
-  AggregationStateHashTableBase *agg_hash_table =
-      group_by_hashtable_pool_->getHashTableFast();
+  auto *agg_hash_table = group_by_hashtable_pool_->getHashTable();
   DCHECK(agg_hash_table != nullptr);
 
   block->aggregateGroupBy(arguments_,
                           group_by_list_,
                           predicate_.get(),
                           agg_hash_table,
-                          group_by_hashtable_pool_->createNewThreadPrivateHashTable(),
-//                          nullptr,
                           &reuse_matches,
                           &reuse_group_by_vectors);
   group_by_hashtable_pool_->returnHashTable(agg_hash_table);
@@ -468,23 +392,23 @@ void AggregationOperationState::finalizeSingleState(
 
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
-      single_states_[agg_idx].reset(
-          handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
-              *distinctify_hashtables_[agg_idx]));
+      // TODO(jianqiao): handle DISTINCT aggregation
     }
 
     attribute_values.emplace_back(
-        handles_[agg_idx]->finalize(*single_states_[agg_idx]));
+        handles_[agg_idx]->finalize(single_states_[agg_idx].get()));
   }
 
   output_destination->insertTuple(Tuple(std::move(attribute_values)));
 }
 
 void AggregationOperationState::mergeGroupByHashTables(
-    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
-  HashTableMergerFast merger(dst);
-  (static_cast<FastHashTable<true, false, true, false> *>(src))
-      ->forEachCompositeKeyFast(&merger);
+    AggregationStateHashTableBase *destination_hash_table,
+    const AggregationStateHashTableBase *source_hash_table) {
+  static_cast<ThreadPrivateAggregationStateHashTable *>(
+      destination_hash_table)->mergeHashTable(
+          static_cast<const ThreadPrivateAggregationStateHashTable *>(
+              source_hash_table));
 }
 
 void AggregationOperationState::finalizeHashTable(
@@ -501,103 +425,22 @@ void AggregationOperationState::finalizeHashTable(
   // e.g. Keep merging entries from smaller hash tables to larger.
 
   auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-  if (hash_tables->size() > 1) {
-    for (int hash_table_index = 0;
-         hash_table_index < static_cast<int>(hash_tables->size() - 1);
-         ++hash_table_index) {
-      // Merge each hash table to the last hash table.
-      mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
-                             hash_tables->back().get());
-    }
-  }
-
-  // Collect per-aggregate finalized values.
-  std::vector<std::unique_ptr<ColumnVector>> final_values;
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (is_distinct_[agg_idx]) {
-      DCHECK(group_by_hashtable_pool_ != nullptr);
-      auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-      DCHECK(hash_tables != nullptr);
-      if (hash_tables->empty()) {
-        // We may have a case where hash_tables is empty, e.g. no input blocks.
-        // However for aggregateOnDistinctifyHashTableForGroupBy to work
-        // correctly, we should create an empty group by hash table.
-        AggregationStateHashTableBase *new_hash_table =
-            group_by_hashtable_pool_->getHashTableFast();
-        group_by_hashtable_pool_->returnHashTable(new_hash_table);
-        hash_tables = group_by_hashtable_pool_->getAllHashTables();
-      }
-      DCHECK(hash_tables->back() != nullptr);
-      AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
-      DCHECK(agg_hash_table != nullptr);
-      handles_[agg_idx]->allowUpdate();
-      handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
-          *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
-    }
-
-    auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    DCHECK(hash_tables != nullptr);
-    if (hash_tables->empty()) {
-      // We may have a case where hash_tables is empty, e.g. no input blocks.
-      // However for aggregateOnDistinctifyHashTableForGroupBy to work
-      // correctly, we should create an empty group by hash table.
-      AggregationStateHashTableBase *new_hash_table =
-          group_by_hashtable_pool_->getHashTableFast();
-      group_by_hashtable_pool_->returnHashTable(new_hash_table);
-      hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    }
-    AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
-    DCHECK(agg_hash_table != nullptr);
-    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *agg_hash_table, &group_by_keys, agg_idx);
-    if (agg_result_col != nullptr) {
-      final_values.emplace_back(agg_result_col);
-    }
-  }
-
-  // Reorganize 'group_by_keys' in column-major order so that we can make a
-  // ColumnVectorsValueAccessor to bulk-insert results.
-  //
-  // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
-  // if there is only one aggregate. The need to do this should hopefully go
-  // away when we work out storing composite structures for multiple aggregates
-  // in a single HashTable.
-  std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
-  std::size_t group_by_element_idx = 0;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    const Type &group_by_type = group_by_element->getType();
-    if (NativeColumnVector::UsableForType(group_by_type)) {
-      NativeColumnVector *element_cv =
-          new NativeColumnVector(group_by_type, group_by_keys.size());
-      group_by_cvs.emplace_back(element_cv);
-      for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(
-            std::move(group_key[group_by_element_idx]));
-      }
-    } else {
-      IndirectColumnVector *element_cv =
-          new IndirectColumnVector(group_by_type, group_by_keys.size());
-      group_by_cvs.emplace_back(element_cv);
-      for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(
-            std::move(group_key[group_by_element_idx]));
-      }
-    }
-    ++group_by_element_idx;
+  if (hash_tables->size() == 0) {
+    return;
   }
 
-  // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
-  // and the finalized aggregates.
-  ColumnVectorsValueAccessor complete_result;
-  for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
-    complete_result.addColumn(group_by_cv.release());
-  }
-  for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
-    complete_result.addColumn(final_value_cv.release());
+  std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+      hash_tables->back().release());
+  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+    std::unique_ptr<AggregationStateHashTableBase> hash_table(
+        hash_tables->at(i).release());
+    mergeGroupByHashTables(final_hash_table.get(), hash_table.get());
   }
 
   // Bulk-insert the complete result.
-  output_destination->bulkInsertTuples(&complete_result);
+  std::unique_ptr<AggregationResultIterator> results(
+      final_hash_table->createResultIterator());
+  output_destination->bulkInsertAggregationResults(results.get());
 }
 
 }  // namespace quickstep