You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/17 21:56:04 UTC

[6/6] incubator-quickstep git commit: Updates

Updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9ccd5a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9ccd5a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9ccd5a31

Branch: refs/heads/untyped-agg
Commit: 9ccd5a31189823f0e63b0a34ec02978c8695b25f
Parents: 140069b
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Oct 17 16:55:49 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Mon Oct 17 16:55:49 2016 -0500

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   |   68 -
 .../aggregation/AggregationConcreteHandle.hpp   |  325 ---
 expressions/aggregation/AggregationHandle.hpp   |  405 +--
 .../aggregation/AggregationHandleAvg.cpp        |  321 ++-
 .../aggregation/AggregationHandleAvg.hpp        |  200 +-
 .../aggregation/AggregationHandleCount.cpp      |  134 +-
 .../aggregation/AggregationHandleCount.hpp      |  183 +-
 .../aggregation/AggregationHandleDistinct.cpp   |   81 -
 .../aggregation/AggregationHandleDistinct.hpp   |  130 -
 .../aggregation/AggregationHandleMax.cpp        |  190 +-
 .../aggregation/AggregationHandleMax.hpp        |  174 +-
 .../aggregation/AggregationHandleMin.cpp        |  192 +-
 .../aggregation/AggregationHandleMin.hpp        |  176 +-
 .../aggregation/AggregationHandleSum.cpp        |  128 +-
 .../aggregation/AggregationHandleSum.hpp        |  187 +-
 expressions/aggregation/CMakeLists.txt          |   49 +-
 storage/AggregationHashTable.hpp                |  330 ---
 storage/AggregationOperationState.cpp           |  317 +--
 storage/AggregationOperationState.hpp           |   20 +-
 storage/AggregationResultIterator.hpp           |  104 +
 storage/AggregationStateHashTable.hpp           |  338 +++
 storage/AggregationStateManager.hpp             |  181 ++
 storage/CMakeLists.txt                          |   98 +-
 storage/FastHashTable.hpp                       | 2515 ------------------
 storage/FastHashTableFactory.hpp                |  257 --
 storage/FastSeparateChainingHashTable.hpp       | 1734 ------------
 storage/HashTableBase.hpp                       |   43 +-
 storage/HashTablePool.hpp                       |  109 +-
 storage/HashTableUntypedKeyManager.hpp          |   34 +-
 storage/InsertDestination.cpp                   |   18 +
 storage/InsertDestination.hpp                   |    3 +
 storage/PackedRowStoreTupleStorageSubBlock.cpp  |   30 +
 storage/PackedRowStoreTupleStorageSubBlock.hpp  |    3 +
 storage/StorageBlock.cpp                        |  152 +-
 storage/StorageBlock.hpp                        |   55 +-
 storage/TupleStorageSubBlock.hpp                |    7 +
 types/CharType.hpp                              |   12 +
 types/DateType.hpp                              |   13 +-
 types/DatetimeIntervalType.hpp                  |   12 +-
 types/DatetimeLit.hpp                           |    4 +
 types/DatetimeType.hpp                          |   12 +-
 types/NullType.hpp                              |    4 +
 types/NumericSuperType.hpp                      |   11 +-
 types/Type.hpp                                  |    9 +-
 types/TypeFunctors.cpp                          |   54 +-
 types/TypeFunctors.hpp                          |   15 +-
 types/VarCharType.hpp                           |   12 +
 types/YearMonthIntervalType.hpp                 |   12 +-
 .../ArithmeticBinaryOperators.hpp               |   20 +
 .../binary_operations/BinaryOperation.hpp       |    7 +
 50 files changed, 1592 insertions(+), 7896 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
deleted file mode 100644
index e3fb520..0000000
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
-
-#include <cstddef>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
-
-namespace quickstep {
-
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type*> &key_types,
-    const std::size_t estimated_num_distinct_keys,
-    StorageManager *storage_manager) const {
-  // Create a hash table with key types as key_types and value type as bool.
-  return AggregationStateHashTableFactory<bool>::CreateResizable(
-      hash_table_impl,
-      key_types,
-      estimated_num_distinct_keys,
-      storage_manager);
-}
-
-void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &key_ids,
-    AggregationStateHashTableBase *distinctify_hash_table) const {
-  // If the key-value pair is already there, we don't need to update the value,
-  // which should always be "true". I.e. the value is just a placeholder.
-
-  AggregationStateFastHashTable *hash_table =
-      static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
-  if (key_ids.size() == 1) {
-    hash_table->upsertValueAccessorFast(
-        key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
-  } else {
-    std::vector<attribute_id> empty_args {kInvalidAttributeID};
-    hash_table->upsertValueAccessorCompositeKeyFast(
-        empty_args, accessor, key_ids, true /* check_for_null_keys */);
-  }
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
deleted file mode 100644
index 398a032..0000000
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
-#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
-
-#include <cstddef>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableBase.hpp"
-#include "threading/SpinMutex.hpp"
-#include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-/** \addtogroup Expressions
- *  @{
- */
-
-/**
- * @brief An upserter class for modifying the destination hash table while
- *        merging two group by hash tables.
- **/
-template <typename HandleT>
-class HashTableStateUpserterFast {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param handle The aggregation handle being used.
-   * @param source_state The aggregation state in the source aggregation hash
-   *        table. The corresponding state (for the same key) in the destination
-   *        hash table will be upserted.
-   **/
-  HashTableStateUpserterFast(const HandleT &handle,
-                             const std::uint8_t *source_state)
-      : handle_(handle), source_state_(source_state) {}
-
-  /**
-   * @brief The operator for the functor required for the upsert.
-   *
-   * @param destination_state The aggregation state in the aggregation hash
-   *        table that is being upserted.
-   **/
-  void operator()(std::uint8_t *destination_state) {
-    handle_.mergeStatesFast(source_state_, destination_state);
-  }
-
- private:
-  const HandleT &handle_;
-  const std::uint8_t *source_state_;
-
-  DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast);
-};
-
-/**
- * @brief The helper intermediate subclass of AggregationHandle that provides
- *        virtual method implementations as well as helper methods that are
- *        shared among all its subclasses.
- *
- * @note The reason that we have this intermediate class instead of putting
- *       everything inside AggregationHandle is to avoid cyclic dependency, e.g.
- *       when HashTable has to be used.
- **/
-class AggregationConcreteHandle : public AggregationHandle {
- public:
-  /**
-   * @brief Default implementaion for AggregationHandle::accumulateNullary().
-   */
-  AggregationState* accumulateNullary(
-      const std::size_t num_tuples) const override {
-    LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that "
-               << "takes at least one argument.";
-  }
-
-  /**
-   * @brief Implementaion for AggregationHandle::createDistinctifyHashTable()
-   *        that creates a new HashTable for the distinctify step for
-   *        DISTINCT aggregation.
-   */
-  AggregationStateHashTableBase* createDistinctifyHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &key_types,
-      const std::size_t estimated_num_distinct_keys,
-      StorageManager *storage_manager) const override;
-
-  /**
-   * @brief Implementaion for
-   * AggregationHandle::insertValueAccessorIntoDistinctifyHashTable()
-   * that inserts the GROUP BY expressions and aggregation arguments together
-   * as keys into the distinctify hash table.
-   */
-  void insertValueAccessorIntoDistinctifyHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &key_ids,
-      AggregationStateHashTableBase *distinctify_hash_table) const override;
-
- protected:
-  AggregationConcreteHandle() {}
-
-  template <typename HandleT, typename StateT>
-  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
-      const AggregationStateHashTableBase &distinctify_hash_table) const;
-
-  template <typename HandleT, typename HashTableT>
-  void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *hash_table,
-      std::size_t index) const;
-
-  template <typename HandleT, typename HashTableT>
-  ColumnVector* finalizeHashTableHelperFast(
-      const Type &result_type,
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const;
-
-  template <typename HandleT, typename HashTableT>
-  inline TypedValue finalizeGroupInHashTableFast(
-      const AggregationStateHashTableBase &hash_table,
-      const std::vector<TypedValue> &group_key,
-      int index) const {
-    const std::uint8_t *group_state =
-        static_cast<const HashTableT &>(hash_table).getSingleCompositeKey(group_key, index);
-    DCHECK(group_state != nullptr)
-        << "Could not find entry for specified group_key in HashTable";
-    return static_cast<const HandleT *>(this)->finalizeHashTableEntryFast(
-        group_state);
-  }
-
-  template <typename HandleT, typename HashTableT>
-  void mergeGroupByHashTablesHelperFast(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
-};
-
-/**
- * @brief Templated helper class used to implement
- *        AggregationHandle::finalizeHashTable() by visiting each entry (i.e.
- *        GROUP) in a HashTable, finalizing the aggregation for the GROUP, and
- *        collecting the GROUP BY key values and the final aggregate values in
- *        a ColumnVector.
- **/
-template <typename HandleT, typename ColumnVectorT>
-class HashTableAggregateFinalizer {
- public:
-  HashTableAggregateFinalizer(
-      const HandleT &handle,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      ColumnVectorT *output_column_vector)
-      : handle_(handle),
-        group_by_keys_(group_by_keys),
-        output_column_vector_(output_column_vector) {}
-
-  inline void operator()(const std::vector<TypedValue> &group_by_key,
-                         const AggregationState &group_state) {
-    group_by_keys_->emplace_back(group_by_key);
-    output_column_vector_->appendTypedValue(
-        handle_.finalizeHashTableEntry(group_state));
-  }
-
-  inline void operator()(const std::vector<TypedValue> &group_by_key,
-                         const unsigned char *byte_ptr) {
-    group_by_keys_->emplace_back(group_by_key);
-    output_column_vector_->appendTypedValue(
-        handle_.finalizeHashTableEntryFast(byte_ptr));
-  }
-
- private:
-  const HandleT &handle_;
-  std::vector<std::vector<TypedValue>> *group_by_keys_;
-  ColumnVectorT *output_column_vector_;
-};
-
-/** @} */
-
-// ----------------------------------------------------------------------------
-// Implementations of templated methods follow:
-
-template <typename HandleT, typename StateT>
-StateT* AggregationConcreteHandle::
-    aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
-        const AggregationStateHashTableBase &distinctify_hash_table) const {
-  const HandleT &handle = static_cast<const HandleT &>(*this);
-  StateT *state = static_cast<StateT *>(createInitialState());
-
-  // A lambda function which will be called on each key from the distinctify
-  // hash table.
-  const auto aggregate_functor = [&handle, &state](
-      const TypedValue &key, const std::uint8_t &dumb_placeholder) {
-    // For each (unary) key in the distinctify hash table, aggregate the key
-    // into "state".
-    handle.iterateUnaryInl(state, key);
-  };
-
-  const AggregationStateFastHashTable &hash_table =
-      static_cast<const AggregationStateFastHashTable &>(
-          distinctify_hash_table);
-  // Invoke the lambda function "aggregate_functor" on each key from the
-  // distinctify hash table.
-  hash_table.forEach(&aggregate_functor);
-
-  return state;
-}
-
-template <typename HandleT, typename HashTableT>
-void AggregationConcreteHandle::
-    aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
-        const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table,
-        std::size_t index) const {
-  const HandleT &handle = static_cast<const HandleT &>(*this);
-  HashTableT *target_hash_table =
-      static_cast<HashTableT *>(aggregation_hash_table);
-
-  // A lambda function which will be called on each key-value pair from the
-  // distinctify hash table.
-  const auto aggregate_functor = [&handle, &target_hash_table, &index](
-      std::vector<TypedValue> &key, const bool &dumb_placeholder) {
-    // For each (composite) key vector in the distinctify hash table with size N.
-    // The first N-1 entries are GROUP BY columns and the last entry is the
-    // argument to be aggregated on.
-    const TypedValue argument(std::move(key.back()));
-    key.pop_back();
-
-    // An upserter as lambda function for aggregating the argument into its
-    // GROUP BY group's entry inside aggregation_hash_table.
-    const auto upserter = [&handle, &argument](std::uint8_t *state) {
-      handle.iterateUnaryInlFast(argument, state);
-    };
-
-    target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index);
-  };
-
-  const HashTableT &source_hash_table =
-      static_cast<const HashTableT &>(distinctify_hash_table);
-  // Invoke the lambda function "aggregate_functor" on each composite key vector
-  // from the distinctify hash table.
-  source_hash_table.forEachCompositeKeyFast(&aggregate_functor);
-}
-
-template <typename HandleT, typename HashTableT>
-ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
-    const Type &result_type,
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  const HandleT &handle = static_cast<const HandleT &>(*this);
-  const HashTableT &hash_table_concrete =
-      static_cast<const HashTableT &>(hash_table);
-
-  if (group_by_keys->empty()) {
-    if (NativeColumnVector::UsableForType(result_type)) {
-      NativeColumnVector *result =
-          new NativeColumnVector(result_type, hash_table_concrete.numEntries());
-      HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
-          handle, group_by_keys, result);
-      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
-      return result;
-    } else {
-      IndirectColumnVector *result = new IndirectColumnVector(
-          result_type, hash_table_concrete.numEntries());
-      HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
-          handle, group_by_keys, result);
-      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
-      return result;
-    }
-  } else {
-    if (NativeColumnVector::UsableForType(result_type)) {
-      NativeColumnVector *result =
-          new NativeColumnVector(result_type, group_by_keys->size());
-      for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
-        result->appendTypedValue(
-            finalizeGroupInHashTableFast<HandleT, HashTableT>(
-                hash_table, group_by_key, index));
-      }
-      return result;
-    } else {
-      IndirectColumnVector *result = new IndirectColumnVector(
-          result_type, hash_table_concrete.numEntries());
-      for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
-        result->appendTypedValue(
-            finalizeGroupInHashTableFast<HandleT, HashTableT>(
-                hash_table, group_by_key, index));
-      }
-      return result;
-    }
-  }
-}
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 19f28ff..40d5e26 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -21,14 +21,19 @@
 #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_HPP_
 
 #include <cstddef>
+#include <cstring>
+#include <functional>
 #include <memory>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTableBase.hpp"
+#include "types/Type.hpp"
 #include "types/TypedValue.hpp"
+#include "utility/ScopedBuffer.hpp"
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
+
 namespace quickstep {
 
 class ColumnVector;
@@ -40,66 +45,10 @@ class ValueAccessor;
  *  @{
  */
 
-/**
- * @brief Abstract base class for aggregation state.
- **/
-class AggregationState {
- public:
-  /**
-   * @brief Default constructor.
-   **/
-  AggregationState() {}
+typedef std::function<void (void *, const void *)> AggregationStateAccumulateFunctor;
+typedef std::function<void (void *, const void *)> AggregationStateMergeFunctor;
+typedef std::function<void (void *, const void *)> AggregationStateFinalizeFunctor;
 
-  /**
-   * @brief Pure virtual destructor.
-   **/
-  virtual ~AggregationState() = 0;
-};
-
-// Destructor should be defined. This will be called when derived class
-// destructor is called.
-inline AggregationState::~AggregationState() {}
-
-/**
- * @brief AggregationHandle encapsulates logic for actually computing
- *        aggregates with particular argument(s).
- * @note See also AggregateFunction, which represents a SQL aggregate function
- *       in the abstract sense.
- *
- * An AggregationHandle is created by calling
- * AggregateFunction::createHandle(). The AggregationHandle object provides
- * methods that are used to actually compute the aggregate, storing
- * intermediate results in AggregationState objects.
- *
- * I. The work-flow for computing an aggregate without GROUP BY is as follows:
- *     1. Create a global state for the aggregate with createInitialState().
- *     2. For each block in a relation (parallelizable):
- *        a. Call StorageBlock::aggregate() to accumulate results from the
- *           block (under the covers, this calls either
- *           accumulateColumnVectors() or accumulateValueAccessor() to do the
- *           actual per-block aggregation in a vectorized fashion).
- *        b. Merge the per-block results back with the global state by calling
- *           mergeStates() (this is threadsafe).
- *     3. Generate the final result by calling finalize() on the global state.
- *
- * II. The work-flow for computing an aggregate with GROUP BY is as follows:
- *     1. Create a HashTable to hold per-group states by calling
- *        createGroupByHashTable().
- *     2. For each block in a relation (parallelizable):
- *        a. Call StorageBlock::aggregateGroupBy() to update the states in the
- *           HashTable according to the values in the block (under the covers,
- *           this calls aggregateValueAccessorIntoHashTable() to aggregate over
- *           all the values/groups in a block in one shot; this is threadsafe).
- *     3. Generate the final set of groups and their corresponding results by
- *        calling finalizeHashTable().
- *
- * See also AggregationOperationState, which holds 1 or more global states or
- * HashTables for an aggregate query, and has some logic to re-use common
- * information across multiple aggregates being computed on the same block
- * (e.g. the set of matches for a predicate, or the values of computed GROUP BY
- * expressions). AggregationOperationState also has a method to write out
- * finalized aggregate values to an InsertDestination.
- **/
 class AggregationHandle {
  public:
   /**
@@ -109,67 +58,25 @@ class AggregationHandle {
   virtual ~AggregationHandle() {}
 
   /**
-   * @brief Create an initial "blank" state for this aggregation.
-   *
-   * @return An initial "blank" state for this particular aggregation.
-   **/
-  virtual AggregationState* createInitialState() const = 0;
-
-  virtual std::size_t getStateSize() const {
-    return 0;
-  }
-
-  /**
-   * @brief Create a new HashTable for aggregation with GROUP BY.
-   *
-   * @param hash_table_impl The choice of which concrete HashTable
-   *        implementation to use.
-   * @param group_by_types The types of the GROUP BY columns/expressions. These
-   *        correspond to the (composite) key type for the HashTable.
-   * @param estimated_num_groups The estimated number of distinct groups for
-   *        the GROUP BY aggregation. This is used to size the initial
-   *        HashTable. This is an estimate only, and the HashTable will be
-   *        resized if it becomes over-full.
-   * @param storage_manager The StorageManager to use to create the HashTable.
-   *        A StorageBlob will be allocated to serve as the HashTable's
-   *        in-memory storage.
-   * @return A new HashTable instance with the appropriate state type for this
-   *         aggregate.
-   **/
-  virtual AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const = 0;
-
-  /**
    * @brief Accumulate over tuples for a nullary aggregate function (one that
    *        has zero arguments, i.e. COUNT(*)).
-   *
-   * @param num_tuples The number of tuples to "accumulate". No actual tuple
-   *        data is accessed, the only thing that a nullary aggeregate can know
-   *        about input is its cardinality.
-   * @return A new AggregationState which contains the accumulated results from
-   *         applying the (nullary) aggregate to the specified number of
-   *         tuples.
    **/
-  virtual AggregationState* accumulateNullary(
-      const std::size_t num_tuples) const = 0;
+  virtual void accumulateNullary(void *state,
+                                 const std::size_t num_tuples) const {
+    LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that "
+               << "takes at least one argument.";
+  }
 
   /**
    * @brief Accumulate (iterate over) all values in one or more ColumnVectors
    *        and return a new AggregationState which can be merged with other
    *        states or finalized.
-   *
-   * @param column_vectors One or more ColumnVectors that the aggregate will be
-   *        applied to. These correspond to the aggregate function's arguments,
-   *        in order.
-   * @return A new AggregationState which contains the accumulated results from
-   *         applying the aggregate to column_vectors. Caller is responsible
-   *         for deleting the returned AggregationState.
    **/
-  virtual AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const = 0;
+  virtual void accumulateColumnVectors(
+      void *state,
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+    LOG(FATAL) << "Not implemented\n";
+  }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   /**
@@ -186,243 +93,79 @@ class AggregationHandle {
    *         applying the aggregate to the specified columns in accessor.
    *         Caller is responsible for deleting the returned AggregationState.
    **/
-  virtual AggregationState* accumulateValueAccessor(
+  virtual void accumulateValueAccessor(
+      void *state,
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const = 0;
+      const std::vector<attribute_id> &accessor_ids) const {
+    LOG(FATAL) << "Not implemented\n";
+  }
 #endif
 
   /**
-   * @brief Perform an aggregation with GROUP BY over all the tuples accessible
-   *        through a ValueAccessor, upserting states in a HashTable.
-   *
-   * @note Implementations of this method are threadsafe with respect to
-   *       hash_table, and can be called concurrently from multiple threads
-   *       with the same HashTable object.
-   *
-   * @param accessor The ValueAccessor that will be iterated over to read
-   *        tuples.
-   * @param argument_ids The attribute_ids of the arguments to this aggregate
-   *        in accessor, in order.
-   * @param group_by_key_ids The attribute_ids of the group-by
-   *        columns/expressions in accessor.
-   * @param hash_table The HashTable to upsert AggregationStates in. This
-   *        should have been created by calling createGroupByHashTable() on
-   *        this same AggregationHandle.
-   **/
-  virtual void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const = 0;
-
-  /**
-   * @brief Merge two AggregationStates, updating one in-place. This computes a
-   *        new intermediate result and stores it in the destination
-   *        AggregationState.
-   *
-   * @note The order of arguments (i.e. which AggregationState is source and
-   *       which is destination) is not important for the result of the merging
-   *       process, but it does determine which of the two states is
-   *       overwritten (destination) and which remains unchanged (source).
-   * @note Implementations of this method are threadsafe with respect to the
-   *       destination state, and can be called concurrently from multiple
-   *       threads with the same state object.
-   *
-   * @param source The AggregationState to merge "from".
-   * @param destination The AggregationState to merge "to". The internal state
-   *        will be overwritten with the merged result.
-   **/
-  virtual void mergeStates(const AggregationState &source,
-                           AggregationState *destination) const = 0;
-
-  /**
-   * @brief Computes and returns the resulting aggregate by using intermediate
-   *        result saved in this handle.
-   *
-   * @note Except for count, SQL89 aggeregates return NULL when no rows are
-   *       selected.
-   * @warning It is dangerous to assume that calling mergeStates() or some
-   *          iterate method after previous finalize call will work correctly
-   *          when the aggregate function is not one of SQL89 aggregates (SUM,
-   *          COUNT, MIN, MAX, AVG).
-   *
-   * @return The result of this aggregation.
+   * @brief Get the number of bytes needed to store the aggregation handle's
+   *        state.
    **/
-  virtual TypedValue finalize(const AggregationState &state) const = 0;
+  inline std::size_t getStateSize() const {
+    return state_size_;
+  }
 
-  /**
-   * @brief Compute and return finalized aggregates for all groups in a
-   *        HashTable.
-   *
-   * @param hash_table The HashTable to finalize states from. This should have
-   *        have been created by calling createGroupByHashTable() on this same
-   *        AggregationHandle.
-   * @param group_by_keys A pointer to a vector of vectors of GROUP BY keys. If
-   *        this is initially empty, it will be filled in with the GROUP BY
-   *        keys visited by this method in the same order as the finalized
-   *        values returned in the ColumnVector. If this is already filled in,
-   *        then this method will visit the GROUP BY keys in the exact order
-   *        specified.
-   * @param index The index of the AggregationHandle to be finalized.
-   *
-   * @return A ColumnVector containing each group's finalized aggregate value.
-   **/
-  virtual ColumnVector* finalizeHashTable(
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const = 0;
+  inline void initializeState(void *state) const {
+    std::memcpy(state, blank_state_.get(), state_size_);
+  }
 
-  /**
-   * @brief Create a new HashTable for the distinctify step for DISTINCT
-   * aggregation.
-   *
-   * Distinctify is the first step for DISTINCT aggregation. This step inserts
-   * the GROUP BY expression values and aggregation arguments together as keys
-   * into the distinctify hash table, so that arguments are distinctified within
-   * each GROUP BY group. Later, a second-round aggregation on the distinctify
-   * hash table will be performed to actually compute the aggregated result for
-   * each GROUP BY group.
-   *
-   * In the case of single aggregation where there is no GROUP BY expressions,
-   * we simply treat it as a special GROUP BY case that the GROUP BY expression
-   * vector is empty.
-   *
-   * @param hash_table_impl The choice of which concrete HashTable
-   *        implementation to use.
-   * @param key_types The types of the GROUP BY expressions together with the
-   *        types of the aggregation arguments.
-   * @param estimated_num_distinct_keys The estimated number of distinct keys
-   *        (i.e. GROUP BY expressions together with aggregation arguments) for
-   *        the distinctify step. This is used to size the initial HashTable.
-   *        This is an estimate only, and the HashTable will be resized if it
-   *        becomes over-full.
-   * @param storage_manager The StorageManager to use to create the HashTable.
-   *        A StorageBlob will be allocated to serve as the HashTable's
-   *        in-memory storage.
-   *
-   * @return A new HashTable instance with the appropriate state type for this
-   *         aggregate.
-   */
-  virtual AggregationStateHashTableBase* createDistinctifyHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &key_types,
-      const std::size_t estimated_num_distinct_keys,
-      StorageManager *storage_manager) const = 0;
+  inline ScopedBuffer createInitialState() const {
+    ScopedBuffer state(state_size_, false);
+    initializeState(state.get());
+    return state;
+  }
 
-  /**
-   * @brief Inserts the GROUP BY expressions and aggregation arguments together
-   * as keys into the distinctify hash table.
-   *
-   * @param accessor The ValueAccessor that will be iterated over to read
-   *        tuples.
-   * @param key_ids The attribute_ids of the GROUP BY expressions in accessor
-   *        together with the attribute_ids of the arguments to this aggregate
-   *        in accessor, in order.
-   * @param distinctify_hash_table The HashTable to store the GROUP BY
-   *        expressions and the aggregation arguments together as hash table
-   *        keys and a bool constant \c true as hash table value (So the hash
-   *        table actually serves as a hash set). This should have been created
-   *        by calling createDistinctifyHashTable();
-   */
-  virtual void insertValueAccessorIntoDistinctifyHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &key_ids,
-      AggregationStateHashTableBase *distinctify_hash_table) const = 0;
+  inline void mergeStates(void *destination_state,
+                          const void *source_state) const {
+    merge_functor_(destination_state, source_state);
+  }
 
-  /**
-   * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from
-   * the distinctify hash table to actually compute the aggregated results.
-   *
-   * @param distinctify_hash_table Hash table which stores the distinctified
-   *        aggregation arguments as hash table keys. This should have been
-   *        created by calling createDistinctifyHashTable();
-   * @return A new AggregationState which contains the aggregated results from
-   *         applying the aggregate to the distinctify hash table.
-   *         Caller is responsible for deleting the returned AggregationState.
-   */
-  virtual AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table) const = 0;
+  inline std::size_t getResultSize() const {
+    return result_type_->maximumByteLength();
+  }
 
-  /**
-   * @brief Perform GROUP BY aggregation on the keys from the distinctify hash
-   * table and upserts states into the aggregation hash table.
-   *
-   * @param distinctify_hash_table Hash table which stores the GROUP BY
-   *        expression values and aggregation arguments together as hash table
-   *        keys.
-   * @param aggregation_hash_table The HashTable to upsert AggregationStates in.
-   *        This should have been created by calling createGroupByHashTable() on
-   *        this same AggregationHandle.
-   * @param index The index of the distinctify hash table for which we perform
-   *        the DISTINCT aggregation.
-   */
-  virtual void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const = 0;
+  inline void finalize(void *value, const void *state) const {
+    finalize_functor_(value, state);
+  }
 
-  /**
-   * @brief Get the number of bytes needed to store the aggregation handle's
-   *        state.
-   **/
-  virtual std::size_t getPayloadSize() const { return 1; }
+  inline TypedValue finalize(const void *state) const {
+    ScopedBuffer value(state_size_, false);
+    finalize(value.get(), state);
+    TypedValue result = result_type_->makeValue(value.get());
+    result.ensureNotReference();
+    return result;
+  }
 
-  /**
-   * @brief Update the aggregation state for nullary aggregation function e.g.
-   *        COUNT(*).
-   *
-   * @note This function should be overloaded by those aggregation function
-   *       which can perform nullary operations, e.g. COUNT.
-   *
-   * @param byte_ptr The pointer where the aggregation state is stored.
-   **/
-  virtual void updateStateNullary(std::uint8_t *byte_ptr) const {}
+  inline const AggregationStateMergeFunctor& getStateAccumulateFunctor() const {
+    DCHECK(accumulate_functor_);
+    return accumulate_functor_;
+  }
 
-  /**
-   * @brief Update the aggregation state for unary aggregation function e.g.
-   *        SUM(a).
-   *
-   * @param argument The argument which will be used to update the state of the
-   *        aggregation function.
-   * @param byte_ptr The pointer where the aggregation state is stored.
-   **/
-  virtual void updateStateUnary(const TypedValue &argument,
-                                std::uint8_t *byte_ptr) const {}
+  inline const AggregationStateMergeFunctor& getStateMergeFunctor() const {
+    DCHECK(merge_functor_);
+    return merge_functor_;
+  }
 
-  /**
-   * @brief Merge two aggregation states for this aggregation handle.
-   *
-   * @note This function should be used with the hash table specifically meant
-   *       for aggregations only.
-   *
-   * @param src A pointer to the source aggregation state.
-   * @param dst A pointer to the destination aggregation state.
-   **/
-  virtual void mergeStatesFast(const std::uint8_t *src,
-                               std::uint8_t *dst) const {}
+  inline const AggregationStateFinalizeFunctor& getStateFinalizeFunctor() const {
+    DCHECK(finalize_functor_);
+    return finalize_functor_;
+  }
 
-  /**
-   * @brief Initialize the payload (in the aggregation hash table) for the given
-   *        aggregation handle.
-   *
-   * @param byte_ptr The pointer to the aggregation state in the hash table.
-   **/
-  virtual void initPayload(std::uint8_t *byte_ptr) const {}
+ protected:
+  AggregationHandle() {}
 
-  /**
-   * @brief Inform the aggregation handle to block (prohibit) updates on the
-   *        aggregation state.
-   **/
-  virtual void blockUpdate() {}
+  std::size_t state_size_;
+  ScopedBuffer blank_state_;
 
-  /**
-   * @brief Inform the aggregation handle to allow updates on the
-   *        aggregation state.
-   **/
-  virtual void allowUpdate() {}
+  AggregationStateAccumulateFunctor accumulate_functor_;
+  AggregationStateMergeFunctor merge_functor_;
 
- protected:
-  AggregationHandle() {}
+  const Type *result_type_;
+  AggregationStateFinalizeFunctor finalize_functor_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationHandle);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 2481092..47f3f41 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -26,7 +26,6 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/HashTable.hpp"
 #include "storage/HashTableFactory.hpp"
-#include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFactory.hpp"
 #include "types/TypeID.hpp"
@@ -41,165 +40,165 @@ namespace quickstep {
 
 class StorageManager;
 
-AggregationHandleAvg::AggregationHandleAvg(const Type &type)
-    : argument_type_(type), block_update_(false) {
-  // We sum Int as Long and Float as Double so that we have more headroom when
-  // adding many values.
-  TypeID type_precision_id;
-  switch (type.getTypeID()) {
-    case kInt:
-    case kLong:
-      type_precision_id = kLong;
-      break;
-    case kFloat:
-    case kDouble:
-      type_precision_id = kDouble;
-      break;
-    default:
-      type_precision_id = type.getTypeID();
-      break;
-  }
-
-  const Type &sum_type = TypeFactory::GetType(type_precision_id);
-  blank_state_.sum_ = sum_type.makeZeroValue();
-  blank_state_.count_ = 0;
-
-  // Make operators to do arithmetic:
-  // Add operator for summing argument values.
-  fast_add_operator_.reset(
-      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
-          .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
-  // Add operator for merging states.
-  merge_add_operator_.reset(
-      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
-          .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
-  // Divide operator for dividing sum by count to get final average.
-  divide_operator_.reset(
-      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
-          .makeUncheckedBinaryOperatorForTypes(sum_type,
-                                               TypeFactory::GetType(kDouble)));
-
-  // Result is nullable, because AVG() over 0 values (or all NULL values) is
-  // NULL.
-  result_type_ =
-      &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
-            .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble))
-            ->getNullableVersion());
-}
-
-AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleAvg::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size();
-
-  AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
-  std::size_t count = 0;
-  state->sum_ = fast_add_operator_->accumulateColumnVector(
-      state->sum_, *column_vectors.front(), &count);
-  state->count_ = count;
-  return state;
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleAvg::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for AVG: " << accessor_ids.size();
-
-  AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
-  std::size_t count = 0;
-  state->sum_ = fast_add_operator_->accumulateValueAccessor(
-      state->sum_, accessor, accessor_ids.front(), &count);
-  state->count_ = count;
-  return state;
-}
-#endif
-
-void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for AVG: " << argument_ids.size();
-}
-
-void AggregationHandleAvg::mergeStates(const AggregationState &source,
-                                       AggregationState *destination) const {
-  const AggregationStateAvg &avg_source =
-      static_cast<const AggregationStateAvg &>(source);
-  AggregationStateAvg *avg_destination =
-      static_cast<AggregationStateAvg *>(destination);
-
-  SpinMutexLock lock(avg_destination->mutex_);
-  avg_destination->count_ += avg_source.count_;
-  avg_destination->sum_ = merge_add_operator_->applyToTypedValues(
-      avg_destination->sum_, avg_source.sum_);
-}
-
-void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
-  const TypedValue *src_sum_ptr =
-      reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
-  const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
-      source + blank_state_.count_offset_);
-  TypedValue *dst_sum_ptr =
-      reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
-  std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(
-      destination + blank_state_.count_offset_);
-  (*dst_count_ptr) += (*src_count_ptr);
-  *dst_sum_ptr =
-      merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
-}
-
-TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
-  const AggregationStateAvg &agg_state =
-      static_cast<const AggregationStateAvg &>(state);
-  if (agg_state.count_ == 0) {
-    // AVG() over no values is NULL.
-    return result_type_->makeNullValue();
-  } else {
-    // Divide sum by count to get final average.
-    return divide_operator_->applyToTypedValues(
-        agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
-  }
-}
-
-ColumnVector* AggregationHandleAvg::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleAvg,
-                                     AggregationStateFastHashTable>(
-      *result_type_, hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
-    const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleAvg,
-      AggregationStateAvg>(distinctify_hash_table);
-}
-
-void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
-    const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleAvg,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
-}
+AggregationHandleAvg::AggregationHandleAvg(const Type &type) {}
+//    : argument_type_(type), block_update_(false) {
+//  // We sum Int as Long and Float as Double so that we have more headroom when
+//  // adding many values.
+//  TypeID type_precision_id;
+//  switch (type.getTypeID()) {
+//    case kInt:
+//    case kLong:
+//      type_precision_id = kLong;
+//      break;
+//    case kFloat:
+//    case kDouble:
+//      type_precision_id = kDouble;
+//      break;
+//    default:
+//      type_precision_id = type.getTypeID();
+//      break;
+//  }
+//
+//  const Type &sum_type = TypeFactory::GetType(type_precision_id);
+//  blank_state_.sum_ = sum_type.makeZeroValue();
+//  blank_state_.count_ = 0;
+//
+//  // Make operators to do arithmetic:
+//  // Add operator for summing argument values.
+//  fast_add_operator_.reset(
+//      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+//          .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
+//  // Add operator for merging states.
+//  merge_add_operator_.reset(
+//      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+//          .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
+//  // Divide operator for dividing sum by count to get final average.
+//  divide_operator_.reset(
+//      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+//          .makeUncheckedBinaryOperatorForTypes(sum_type,
+//                                               TypeFactory::GetType(kDouble)));
+//
+//  // Result is nullable, because AVG() over 0 values (or all NULL values) is
+//  // NULL.
+//  result_type_ =
+//      &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+//            .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble))
+//            ->getNullableVersion());
+//}
+//
+//AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
+//    const HashTableImplType hash_table_impl,
+//    const std::vector<const Type *> &group_by_types,
+//    const std::size_t estimated_num_groups,
+//    StorageManager *storage_manager) const {
+//  return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
+//      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
+//}
+//
+//AggregationState* AggregationHandleAvg::accumulateColumnVectors(
+//    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+//  DCHECK_EQ(1u, column_vectors.size())
+//      << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size();
+//
+//  AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
+//  std::size_t count = 0;
+//  state->sum_ = fast_add_operator_->accumulateColumnVector(
+//      state->sum_, *column_vectors.front(), &count);
+//  state->count_ = count;
+//  return state;
+//}
+//
+//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//AggregationState* AggregationHandleAvg::accumulateValueAccessor(
+//    ValueAccessor *accessor,
+//    const std::vector<attribute_id> &accessor_ids) const {
+//  DCHECK_EQ(1u, accessor_ids.size())
+//      << "Got wrong number of attributes for AVG: " << accessor_ids.size();
+//
+//  AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
+//  std::size_t count = 0;
+//  state->sum_ = fast_add_operator_->accumulateValueAccessor(
+//      state->sum_, accessor, accessor_ids.front(), &count);
+//  state->count_ = count;
+//  return state;
+//}
+//#endif
+//
+//void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
+//    ValueAccessor *accessor,
+//    const std::vector<attribute_id> &argument_ids,
+//    const std::vector<attribute_id> &group_by_key_ids,
+//    AggregationStateHashTableBase *hash_table) const {
+//  DCHECK_EQ(1u, argument_ids.size())
+//      << "Got wrong number of arguments for AVG: " << argument_ids.size();
+//}
+//
+//void AggregationHandleAvg::mergeStates(const AggregationState &source,
+//                                       AggregationState *destination) const {
+//  const AggregationStateAvg &avg_source =
+//      static_cast<const AggregationStateAvg &>(source);
+//  AggregationStateAvg *avg_destination =
+//      static_cast<AggregationStateAvg *>(destination);
+//
+//  SpinMutexLock lock(avg_destination->mutex_);
+//  avg_destination->count_ += avg_source.count_;
+//  avg_destination->sum_ = merge_add_operator_->applyToTypedValues(
+//      avg_destination->sum_, avg_source.sum_);
+//}
+//
+//void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
+//                                           std::uint8_t *destination) const {
+//  const TypedValue *src_sum_ptr =
+//      reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
+//  const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
+//      source + blank_state_.count_offset_);
+//  TypedValue *dst_sum_ptr =
+//      reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
+//  std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(
+//      destination + blank_state_.count_offset_);
+//  (*dst_count_ptr) += (*src_count_ptr);
+//  *dst_sum_ptr =
+//      merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
+//}
+//
+//TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
+//  const AggregationStateAvg &agg_state =
+//      static_cast<const AggregationStateAvg &>(state);
+//  if (agg_state.count_ == 0) {
+//    // AVG() over no values is NULL.
+//    return result_type_->makeNullValue();
+//  } else {
+//    // Divide sum by count to get final average.
+//    return divide_operator_->applyToTypedValues(
+//        agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
+//  }
+//}
+//
+//ColumnVector* AggregationHandleAvg::finalizeHashTable(
+//    const AggregationStateHashTableBase &hash_table,
+//    std::vector<std::vector<TypedValue>> *group_by_keys,
+//    int index) const {
+//  return finalizeHashTableHelperFast<AggregationHandleAvg,
+//                                     AggregationStateFastHashTable>(
+//      *result_type_, hash_table, group_by_keys, index);
+//}
+//
+//AggregationState*
+//AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
+//    const AggregationStateHashTableBase &distinctify_hash_table) const {
+//  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+//      AggregationHandleAvg,
+//      AggregationStateAvg>(distinctify_hash_table);
+//}
+//
+//void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
+//    const AggregationStateHashTableBase &distinctify_hash_table,
+//    AggregationStateHashTableBase *aggregation_hash_table,
+//    std::size_t index) const {
+//  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+//      AggregationHandleAvg,
+//      AggregationStateFastHashTable>(
+//      distinctify_hash_table, aggregation_hash_table, index);
+//}
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 3c6e0c2..cc5adc8 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -26,11 +26,8 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
-#include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/operations/binary_operations/BinaryOperation.hpp"
@@ -49,193 +46,12 @@ class ValueAccessor;
  */
 
 /**
- * @brief Aggregation state for average.
- */
-class AggregationStateAvg : public AggregationState {
- public:
-  /**
-   * @brief Copy constructor (ignores mutex).
-   */
-  AggregationStateAvg(const AggregationStateAvg &orig)
-      : sum_(orig.sum_),
-        count_(orig.count_),
-        sum_offset_(orig.sum_offset_),
-        count_offset_(orig.count_offset_),
-        mutex_offset_(orig.mutex_offset_) {}
-
-  /**
-   * @brief Destructor.
-   */
-  ~AggregationStateAvg() override {}
-
-  std::size_t getPayloadSize() const {
-    std::size_t p1 = reinterpret_cast<std::size_t>(&sum_);
-    std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_);
-    return (p2 - p1);
-  }
-
-  const std::uint8_t *getPayloadAddress() const {
-    return reinterpret_cast<const uint8_t *>(&sum_);
-  }
-
- private:
-  friend class AggregationHandleAvg;
-
-  AggregationStateAvg()
-      : sum_(0),
-        count_(0),
-        sum_offset_(0),
-        count_offset_(reinterpret_cast<std::uint8_t *>(&count_) -
-                      reinterpret_cast<std::uint8_t *>(&sum_)),
-        mutex_offset_(reinterpret_cast<std::uint8_t *>(&mutex_) -
-                      reinterpret_cast<std::uint8_t *>(&sum_)) {}
-
-  // TODO(shoban): We might want to specialize sum_ and count_ to use atomics
-  // for int types similar to in AggregationStateCount.
-  TypedValue sum_;
-  std::int64_t count_;
-  SpinMutex mutex_;
-
-  int sum_offset_, count_offset_, mutex_offset_;
-};
-
-/**
  * @brief An aggregationhandle for avg.
  **/
-class AggregationHandleAvg : public AggregationConcreteHandle {
+class AggregationHandleAvg : public AggregationHandle {
  public:
   ~AggregationHandleAvg() override {}
 
-  AggregationState* createInitialState() const override {
-    return new AggregationStateAvg(blank_state_);
-  }
-
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  /**
-   * @brief Iterate method with average aggregation state.
-   **/
-  inline void iterateUnaryInl(AggregationStateAvg *state,
-                              const TypedValue &value) const {
-    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
-    if (value.isNull()) return;
-
-    SpinMutexLock lock(state->mutex_);
-    state->sum_ = fast_add_operator_->applyToTypedValues(state->sum_, value);
-    ++state->count_;
-  }
-
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
-    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
-    if (value.isNull()) return;
-    TypedValue *sum_ptr =
-        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
-    std::int64_t *count_ptr =
-        reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
-    *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, value);
-    ++(*count_ptr);
-  }
-
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
-
-  void blockUpdate() override { block_update_ = true; }
-
-  void allowUpdate() override { block_update_ = false; }
-
-  void initPayload(std::uint8_t *byte_ptr) const override {
-    TypedValue *sum_ptr =
-        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
-    std::int64_t *count_ptr =
-        reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
-    *sum_ptr = blank_state_.sum_;
-    *count_ptr = blank_state_.count_;
-  }
-
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
-  TypedValue finalize(const AggregationState &state) const override;
-
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    const AggregationStateAvg &agg_state =
-        static_cast<const AggregationStateAvg &>(state);
-    // TODO(chasseur): Could improve performance further if we made a special
-    // version of finalizeHashTable() that collects all the sums into one
-    // ColumnVector and all the counts into another and then applies
-    // '*divide_operator_' to them in bulk.
-    return divide_operator_->applyToTypedValues(
-        agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
-  }
-
-  inline TypedValue finalizeHashTableEntryFast(
-      const std::uint8_t *byte_ptr) const {
-    std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
-    TypedValue *sum_ptr =
-        reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
-    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(
-        value_ptr + blank_state_.count_offset_);
-    return divide_operator_->applyToTypedValues(
-        *sum_ptr, TypedValue(static_cast<double>(*count_ptr)));
-  }
-
-  ColumnVector* finalizeHashTable(
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for AVG aggregation.
-   */
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for AVG aggregation.
-   */
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override {
-    return blank_state_.getPayloadSize();
-  }
-
  private:
   friend class AggregateFunctionAvg;
 
@@ -246,14 +62,12 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
    **/
   explicit AggregationHandleAvg(const Type &type);
 
-  const Type &argument_type_;
-  const Type *result_type_;
-  AggregationStateAvg blank_state_;
-  std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
-  std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
-  std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
-
-  bool block_update_;
+//  const Type &argument_type_;
+//  const Type *result_type_;
+//  AggregationStateAvg blank_state_;
+//  std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+//  std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
+//  std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 034c942..c095a82 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -21,12 +21,11 @@
 
 #include <atomic>
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 #include "storage/ValueAccessor.hpp"
@@ -48,22 +47,37 @@ class Type;
 class ValueAccessor;
 
 template <bool count_star, bool nullable_type>
-AggregationStateHashTableBase*
-AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<
-      AggregationStateCount>::CreateResizable(hash_table_impl,
-                                              group_by_types,
-                                              estimated_num_groups,
-                                              storage_manager);
+AggregationHandleCount<count_star, nullable_type>::AggregationHandleCount() {
+  state_size_ = sizeof(ResultCppType);
+  blank_state_.reset(state_size_, true);
+
+  accumulate_functor_ = [](void *state, const void *value) {
+    *static_cast<ResultCppType *>(state) += 1;
+  };
+
+  merge_functor_ = [](void *state, const void *value) {
+    *static_cast<ResultCppType *>(state) +=
+        *static_cast<const ResultCppType *>(value);
+  };
+
+  finalize_functor_ = [](void *result, const void *state) {
+    *static_cast<ResultCppType *>(result) =
+        *static_cast<const ResultCppType *>(state);
+  };
+
+  result_type_ = &TypeFactory::GetType(ResultType::kStaticTypeID);
+}
+
+template <bool count_star, bool nullable_type>
+void AggregationHandleCount<count_star, nullable_type>::accumulateNullary(
+      void *state,
+      const std::size_t num_tuples) const {
+  *static_cast<ResultCppType *>(state) = num_tuples;
 }
 
 template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
+void AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
+    void *state,
     const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
   DCHECK(!count_star)
       << "Called non-nullary accumulation method on an AggregationHandleCount "
@@ -84,20 +98,22 @@ AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
           // the population count of the null bitmap). We should do something
           // similar for ValueAccessor too.
           for (std::size_t pos = 0; pos < column_vector.size(); ++pos) {
-            count += !column_vector.getTypedValue(pos).isNull();
+            if (column_vector.getUntypedValue(pos) != nullptr) {
+              ++count;
+            }
           }
         } else {
           count = column_vector.size();
         }
       });
 
-  return new AggregationStateCount(count);
+  *static_cast<ResultCppType *>(state) = count;
 }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
+void AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
+    void *state,
     ValueAccessor *accessor,
     const std::vector<attribute_id> &accessor_ids) const {
   DCHECK(!count_star)
@@ -114,91 +130,19 @@ AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
       [&accessor_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
         if (nullable_type) {
           while (accessor->next()) {
-            count += !accessor->getTypedValue(accessor_id).isNull();
+            if (accessor->getUntypedValue(accessor_id) != nullptr) {
+              ++count;
+            }
           }
         } else {
           count = accessor->getNumTuples();
         }
       });
 
-  return new AggregationStateCount(count);
+  *static_cast<ResultCppType *>(state) = count;
 }
 #endif
 
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
-    aggregateValueAccessorIntoHashTable(
-        ValueAccessor *accessor,
-        const std::vector<attribute_id> &argument_ids,
-        const std::vector<attribute_id> &group_by_key_ids,
-        AggregationStateHashTableBase *hash_table) const {
-  if (count_star) {
-    DCHECK_EQ(0u, argument_ids.size())
-        << "Got wrong number of arguments for COUNT(*): "
-        << argument_ids.size();
-  } else {
-    DCHECK_EQ(1u, argument_ids.size())
-        << "Got wrong number of arguments for COUNT: " << argument_ids.size();
-  }
-}
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::mergeStates(
-    const AggregationState &source, AggregationState *destination) const {
-  const AggregationStateCount &count_source =
-      static_cast<const AggregationStateCount &>(source);
-  AggregationStateCount *count_destination =
-      static_cast<AggregationStateCount *>(destination);
-
-  count_destination->count_.fetch_add(
-      count_source.count_.load(std::memory_order_relaxed),
-      std::memory_order_relaxed);
-}
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
-    const std::uint8_t *source, std::uint8_t *destination) const {
-  const std::int64_t *src_count_ptr =
-      reinterpret_cast<const std::int64_t *>(source);
-  std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination);
-  (*dst_count_ptr) += (*src_count_ptr);
-}
-
-template <bool count_star, bool nullable_type>
-ColumnVector*
-AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<
-      AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateFastHashTable>(
-      TypeFactory::GetType(kLong), hash_table, group_by_keys, index);
-}
-
-template <bool count_star, bool nullable_type>
-AggregationState* AggregationHandleCount<count_star, nullable_type>::
-    aggregateOnDistinctifyHashTableForSingle(
-        const AggregationStateHashTableBase &distinctify_hash_table) const {
-  DCHECK_EQ(count_star, false);
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateCount>(distinctify_hash_table);
-}
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
-    aggregateOnDistinctifyHashTableForGroupBy(
-        const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table,
-        std::size_t index) const {
-  DCHECK_EQ(count_star, false);
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
-}
-
 // Explicitly instantiate and compile in the different versions of
 // AggregationHandleCount we need. Note that we do not compile a version with
 // 'count_star == true' and 'nullable_type == true', as that combination is

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 6aab0cd..629cf11 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -27,11 +27,8 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
-#include "types/TypedValue.hpp"
+#include "types/LongType.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -49,42 +46,6 @@ class AggregationHandleCount;
  */
 
 /**
- * @brief Aggregation state of count.
- */
-class AggregationStateCount : public AggregationState {
- public:
-  /**
-   * @brief Copy constructor.
-   */
-  AggregationStateCount(const AggregationStateCount &state)
-      : count_(state.count_.load(std::memory_order_relaxed)) {}
-
-  /**
-   * @brief Destructor.
-   */
-  ~AggregationStateCount() override {}
-
-  std::size_t getPayloadSize() const { return sizeof(count_); }
-
-  const std::uint8_t* getPayloadAddress() const {
-    return reinterpret_cast<const uint8_t *>(&count_);
-  }
-
- private:
-  friend class AggregationHandleCount<false, false>;
-  friend class AggregationHandleCount<false, true>;
-  friend class AggregationHandleCount<true, false>;
-  friend class AggregationHandleCount<true, true>;
-
-  AggregationStateCount() : count_(0) {}
-
-  explicit AggregationStateCount(const std::int64_t initial_count)
-      : count_(initial_count) {}
-
-  std::atomic<std::int64_t> count_;
-};
-
-/**
  * @brief An aggregationhandle for count.
  *
  * @param count_star If true, this AggregationHandleCount is for nullary
@@ -94,151 +55,35 @@ class AggregationStateCount : public AggregationState {
  *        not nullable and NULL-checks can safely be skipped.
  **/
 template <bool count_star, bool nullable_type>
-class AggregationHandleCount : public AggregationConcreteHandle {
+class AggregationHandleCount : public AggregationHandle {
  public:
   ~AggregationHandleCount() override {}
 
-  AggregationState* createInitialState() const override {
-    return new AggregationStateCount();
-  }
-
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
+  void accumulateNullary(
+      void *state,
+      const std::size_t num_tuples) const override;
 
-  inline void iterateNullaryInl(AggregationStateCount *state) const {
-    state->count_.fetch_add(1, std::memory_order_relaxed);
-  }
-
-  inline void iterateNullaryInlFast(std::uint8_t *byte_ptr) const {
-    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-    (*count_ptr)++;
-  }
-
-  /**
-   * @brief Iterate with count aggregation state.
-   */
-  inline void iterateUnaryInl(AggregationStateCount *state,
-                              const TypedValue &value) const {
-    if ((!nullable_type) || (!value.isNull())) {
-      state->count_.fetch_add(1, std::memory_order_relaxed);
-    }
-  }
-
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
-    if ((!nullable_type) || (!value.isNull())) {
-      std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-      (*count_ptr)++;
-    }
-  }
-
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
-
-  inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateNullaryInlFast(byte_ptr);
-    }
-  }
-
-  void blockUpdate() override { block_update_ = true; }
-
-  void allowUpdate() override { block_update_ = false; }
-
-  void initPayload(std::uint8_t *byte_ptr) const override {
-    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-    *count_ptr = 0;
-  }
-
-  AggregationState* accumulateNullary(
-      const std::size_t num_tuples) const override {
-    return new AggregationStateCount(num_tuples);
-  }
-
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
+  void accumulateColumnVectors(
+      void *state,
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
+  void accumulateValueAccessor(
+      void *state,
       ValueAccessor *accessor,
       const std::vector<attribute_id> &accessor_id) const override;
 #endif
 
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
-  TypedValue finalize(const AggregationState &state) const override {
-    return TypedValue(
-        static_cast<const AggregationStateCount &>(state).count_.load(
-            std::memory_order_relaxed));
-  }
-
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return TypedValue(
-        static_cast<const AggregationStateCount &>(state).count_.load(
-            std::memory_order_relaxed));
-  }
-
-  inline TypedValue finalizeHashTableEntryFast(
-      const std::uint8_t *byte_ptr) const {
-    const std::int64_t *count_ptr =
-        reinterpret_cast<const std::int64_t *>(byte_ptr);
-    return TypedValue(*count_ptr);
-  }
-
-  ColumnVector* finalizeHashTable(
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for SUM aggregation.
-   */
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for SUM aggregation.
-   */
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override { return sizeof(std::int64_t); }
-
  private:
   friend class AggregateFunctionCount;
 
+  typedef LongType ResultType;
+  typedef ResultType::cpptype ResultCppType;
+
   /**
    * @brief Constructor.
    **/
-  AggregationHandleCount() : block_update_(false) {}
-
-  bool block_update_;
+  AggregationHandleCount();
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
deleted file mode 100644
index 0dc8b56..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-#include <utility>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-
-#include "types/TypedValue.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-AggregationStateHashTableBase* AggregationHandleDistinct::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type*> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return createDistinctifyHashTable(
-      hash_table_impl,
-      group_by_types,
-      estimated_num_groups,
-      storage_manager);
-}
-
-void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(argument_ids.size(), 0u);
-
-  insertValueAccessorIntoDistinctifyHashTable(
-      accessor,
-      group_by_key_ids,
-      hash_table);
-}
-
-ColumnVector* AggregationHandleDistinct::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  DCHECK(group_by_keys->empty());
-
-  const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
-                                               const bool &dumb_placeholder) -> void {
-    group_by_keys->emplace_back(std::move(group_by_key));
-  };
-  static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKeyFast(&keys_retriever);
-
-  return nullptr;
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
deleted file mode 100644
index 838bfdd..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
-#include "storage/HashTableBase.hpp"
-#include "types/TypedValue.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class AggregationState;
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-/** \addtogroup Expressions
- *  @{
- */
-
-class AggregationHandleDistinct : public AggregationConcreteHandle {
- public:
-  /**
-   * @brief Constructor.
-   **/
-  AggregationHandleDistinct() {}
-
-  AggregationState* createInitialState() const override {
-    LOG(FATAL)
-        << "AggregationHandleDistinct does not support createInitialState().";
-  }
-
-  AggregationState* accumulateNullary(
-      const std::size_t num_tuples) const override {
-    LOG(FATAL)
-        << "AggregationHandleDistinct does not support accumulateNullary().";
-  }
-
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-                  "accumulateColumnVectors().";
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-                  "accumulateValueAccessor().";
-  }
-#endif
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support mergeStates().";
-  }
-
-  TypedValue finalize(const AggregationState &state) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support finalize().";
-  }
-
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-               << "aggregateOnDistinctifyHashTableForSingle().";
-  }
-
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *groupby_hash_table,
-      std::size_t index) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-               << "aggregateOnDistinctifyHashTableForGroupBy().";
-  }
-
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  ColumnVector* finalizeHashTable(
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_