You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/31 20:13:59 UTC

[2/2] incubator-quickstep git commit: Initial update

Initial update


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c1baa015
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c1baa015
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c1baa015

Branch: refs/heads/collision-free-agg
Commit: c1baa0150bd949b3bff3af39c262106836c4a168
Parents: bc3d8a5
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Oct 27 12:23:03 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Mon Oct 31 15:12:53 2016 -0500

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   |  22 +-
 expressions/aggregation/AggregationHandle.hpp   |  32 +-
 .../aggregation/AggregationHandleAvg.cpp        |  32 +-
 .../aggregation/AggregationHandleAvg.hpp        |  11 +-
 .../aggregation/AggregationHandleCount.cpp      |  57 +--
 .../aggregation/AggregationHandleCount.hpp      |  11 +-
 .../aggregation/AggregationHandleDistinct.hpp   |  16 +-
 .../aggregation/AggregationHandleMax.cpp        |  30 +-
 .../aggregation/AggregationHandleMax.hpp        |  11 +-
 .../aggregation/AggregationHandleMin.cpp        |  30 +-
 .../aggregation/AggregationHandleMin.hpp        |  11 +-
 .../aggregation/AggregationHandleSum.cpp        |  31 +-
 .../aggregation/AggregationHandleSum.hpp        |  12 +-
 query_execution/QueryContext.hpp                |  14 -
 .../DestroyAggregationStateOperator.cpp         |   7 -
 storage/AggregationOperationState.cpp           | 395 +++++++++----------
 storage/AggregationOperationState.hpp           |  68 ++--
 storage/FastHashTable.hpp                       | 364 ++++++-----------
 storage/HashTableBase.hpp                       |   7 +-
 storage/StorageBlock.cpp                        | 198 ----------
 storage/StorageBlock.hpp                        | 115 ------
 21 files changed, 460 insertions(+), 1014 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index e3fb520..fd23c7e 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -52,17 +52,17 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
     AggregationStateHashTableBase *distinctify_hash_table) const {
   // If the key-value pair is already there, we don't need to update the value,
   // which should always be "true". I.e. the value is just a placeholder.
-
-  AggregationStateFastHashTable *hash_table =
-      static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
-  if (key_ids.size() == 1) {
-    hash_table->upsertValueAccessorFast(
-        key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
-  } else {
-    std::vector<attribute_id> empty_args {kInvalidAttributeID};
-    hash_table->upsertValueAccessorCompositeKeyFast(
-        empty_args, accessor, key_ids, true /* check_for_null_keys */);
-  }
+  LOG(FATAL) << "Not supported";
+//  AggregationStateFastHashTable *hash_table =
+//      static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
+//  if (key_ids.size() == 1) {
+//    hash_table->upsertValueAccessorFast(
+//        key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
+//  } else {
+//    std::vector<attribute_id> empty_args {kInvalidAttributeID};
+//    hash_table->upsertValueAccessorCompositeKeyFast(
+//        empty_args, accessor, key_ids, true /* check_for_null_keys */);
+//  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 4b51179..751b390 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -32,6 +32,7 @@
 namespace quickstep {
 
 class ColumnVector;
+class ColumnVectorsValueAccessor;
 class StorageManager;
 class Type;
 class ValueAccessor;
@@ -153,39 +154,16 @@ class AggregationHandle {
       const std::size_t num_tuples) const = 0;
 
   /**
-   * @brief Accumulate (iterate over) all values in one or more ColumnVectors
-   *        and return a new AggregationState which can be merged with other
-   *        states or finalized.
+   * @brief TODO
    *
-   * @param column_vectors One or more ColumnVectors that the aggregate will be
-   *        applied to. These correspond to the aggregate function's arguments,
-   *        in order.
    * @return A new AggregationState which contains the accumulated results from
    *         applying the aggregate to column_vectors. Caller is responsible
    *         for deleting the returned AggregationState.
    **/
-  virtual AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const = 0;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  /**
-   * @brief Accumulate (iterate over) all values in columns accessible through
-   *        a ValueAccessor and return a new AggregationState which can be
-   *        merged with other states or finalized.
-   *
-   * @param accessor A ValueAccessor that the columns to be aggregated can be
-   *        accessed through.
-   * @param accessor_ids The attribute_ids that correspond to the columns in
-   *        accessor to aggeregate. These correspond to the aggregate
-   *        function's arguments, in order.
-   * @return A new AggregationState which contains the accumulated results from
-   *         applying the aggregate to the specified columns in accessor.
-   *         Caller is responsible for deleting the returned AggregationState.
-   **/
-  virtual AggregationState* accumulateValueAccessor(
+  virtual AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const = 0;
-#endif
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const = 0;
 
   /**
    * @brief Perform an aggregation with GROUP BY over all the tuples accessible

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 2481092..672db1f 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -96,34 +96,28 @@ AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
       hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
 }
 
-AggregationState* AggregationHandleAvg::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size();
+AggregationState* AggregationHandleAvg::accumulate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor,
+    const std::vector<attribute_id> &argument_ids) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for AVG: " << argument_ids.size();
 
-  AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
-  std::size_t count = 0;
-  state->sum_ = fast_add_operator_->accumulateColumnVector(
-      state->sum_, *column_vectors.front(), &count);
-  state->count_ = count;
-  return state;
-}
+  const attribute_id argument_id = argument_ids.front();
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleAvg::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for AVG: " << accessor_ids.size();
+  ValueAccessor *target_accessor =
+      argument_id >= 0 ? accessor : aux_accessor;
+  const attribute_id target_argument_id =
+      argument_id >= 0 ? argument_id : -(argument_id+2);
 
   AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
   std::size_t count = 0;
   state->sum_ = fast_add_operator_->accumulateValueAccessor(
-      state->sum_, accessor, accessor_ids.front(), &count);
+      state->sum_, target_accessor, target_argument_id, &count);
   state->count_ = count;
   return state;
 }
-#endif
 
 void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
     ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 47132c6..46983be 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -169,15 +169,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
+  AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
-#endif
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override;
 
   void aggregateValueAccessorIntoHashTable(
       ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 034c942..9c00449 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -63,58 +63,32 @@ AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
 
 template <bool count_star, bool nullable_type>
 AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+AggregationHandleCount<count_star, nullable_type>::accumulate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor,
+    const std::vector<attribute_id> &argument_ids) const {
   DCHECK(!count_star)
       << "Called non-nullary accumulation method on an AggregationHandleCount "
       << "set up for nullary COUNT(*)";
 
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for COUNT: "
-      << column_vectors.size();
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for COUNT: " << argument_ids.size();
 
-  std::size_t count = 0;
-  InvokeOnColumnVector(
-      *column_vectors.front(),
-      [&](const auto &column_vector) -> void {  // NOLINT(build/c++11)
-        if (nullable_type) {
-          // TODO(shoban): Iterating over the ColumnVector is a rather slow way
-          // to do this. We should look at extending the ColumnVector interface
-          // to do a quick count of the non-null values (i.e. the length minus
-          // the population count of the null bitmap). We should do something
-          // similar for ValueAccessor too.
-          for (std::size_t pos = 0; pos < column_vector.size(); ++pos) {
-            count += !column_vector.getTypedValue(pos).isNull();
-          }
-        } else {
-          count = column_vector.size();
-        }
-      });
+  const attribute_id argument_id = argument_ids.front();
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
-  return new AggregationStateCount(count);
-}
+  ValueAccessor *target_accessor =
+      argument_id >= 0 ? accessor : aux_accessor;
+  const attribute_id target_argument_id =
+      argument_id >= 0 ? argument_id : -(argument_id+2);
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK(!count_star)
-      << "Called non-nullary accumulation method on an AggregationHandleCount "
-      << "set up for nullary COUNT(*)";
-
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for COUNT: " << accessor_ids.size();
-
-  const attribute_id accessor_id = accessor_ids.front();
   std::size_t count = 0;
   InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-      accessor,
-      [&accessor_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
+      target_accessor,
+      [&target_argument_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
         if (nullable_type) {
           while (accessor->next()) {
-            count += !accessor->getTypedValue(accessor_id).isNull();
+            count += !accessor->getTypedValue(target_argument_id).isNull();
           }
         } else {
           count = accessor->getNumTuples();
@@ -123,7 +97,6 @@ AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
 
   return new AggregationStateCount(count);
 }
-#endif
 
 template <bool count_star, bool nullable_type>
 void AggregationHandleCount<count_star, nullable_type>::

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 6aab0cd..52e63f5 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -162,15 +162,10 @@ class AggregationHandleCount : public AggregationConcreteHandle {
     return new AggregationStateCount(num_tuples);
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
+  AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
-#endif
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override;
 
   void aggregateValueAccessorIntoHashTable(
       ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 838bfdd..5d45219 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -62,21 +62,13 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
         << "AggregationHandleDistinct does not support accumulateNullary().";
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-                  "accumulateColumnVectors().";
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
+  AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override {
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override {
     LOG(FATAL) << "AggregationHandleDistinct does not support "
-                  "accumulateValueAccessor().";
+                  "accumulate().";
   }
-#endif
 
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index c2d571b..aa12ed2 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -54,28 +54,26 @@ AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
       hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
 }
 
-AggregationState* AggregationHandleMax::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
+AggregationState* AggregationHandleMax::accumulate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor,
+    const std::vector<attribute_id> &argument_ids) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for MAX: " << argument_ids.size();
 
-  return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
-      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
+  const attribute_id argument_id = argument_ids.front();
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMax::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for MAX: " << accessor_ids.size();
+  ValueAccessor *target_accessor =
+      argument_id >= 0 ? accessor : aux_accessor;
+  const attribute_id target_argument_id =
+      argument_id >= 0 ? argument_id : -(argument_id+2);
 
   return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
       type_.getNullableVersion().makeNullValue(),
-      accessor,
-      accessor_ids.front()));
+      target_accessor,
+      target_argument_id));
 }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
 void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
     ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index d851a0c..ab7da0a 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -136,15 +136,10 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
+  AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override;
-#endif
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override;
 
   void aggregateValueAccessorIntoHashTable(
       ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index a07f299..12701f2 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -54,28 +54,26 @@ AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
       hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
 }
 
-AggregationState* AggregationHandleMin::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
+AggregationState* AggregationHandleMin::accumulate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor,
+    const std::vector<attribute_id> &argument_ids) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for MIN: " << argument_ids.size();
 
-  return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
-      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
+  const attribute_id argument_id = argument_ids.front();
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMin::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for MIN: " << accessor_ids.size();
+  ValueAccessor *target_accessor =
+      argument_id >= 0 ? accessor : aux_accessor;
+  const attribute_id target_argument_id =
+      argument_id >= 0 ? argument_id : -(argument_id+2);
 
   return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
       type_.getNullableVersion().makeNullValue(),
-      accessor,
-      accessor_ids.front()));
+      target_accessor,
+      target_argument_id));
 }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
 void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
     ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index e3472ec..dcadb80 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -138,15 +138,10 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
+  AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override;
-#endif
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override;
 
   void aggregateValueAccessorIntoHashTable(
       ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 642d88d..f00e00c 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -88,29 +88,26 @@ AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable(
       hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
 }
 
-AggregationState* AggregationHandleSum::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
-  std::size_t num_tuples = 0;
-  TypedValue cv_sum = fast_operator_->accumulateColumnVector(
-      blank_state_.sum_, *column_vectors.front(), &num_tuples);
-  return new AggregationStateSum(std::move(cv_sum), num_tuples == 0);
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleSum::accumulateValueAccessor(
+AggregationState* AggregationHandleSum::accumulate(
     ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for SUM: " << accessor_ids.size();
+    ColumnVectorsValueAccessor *aux_accessor,
+    const std::vector<attribute_id> &argument_ids) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for SUM: " << argument_ids.size();
+
+  const attribute_id argument_id = argument_ids.front();
+  DCHECK_NE(argument_id, kInvalidAttributeID);
+
+  ValueAccessor *target_accessor =
+      argument_id >= 0 ? accessor : aux_accessor;
+  const attribute_id target_argument_id =
+      argument_id >= 0 ? argument_id : -(argument_id+2);
 
   std::size_t num_tuples = 0;
   TypedValue va_sum = fast_operator_->accumulateValueAccessor(
-      blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples);
+        blank_state_.sum_, target_accessor, target_argument_id, &num_tuples);
   return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
 }
-#endif
 
 void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
     ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index f0d23e1..cc24335 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -41,6 +41,7 @@
 namespace quickstep {
 
 class ColumnVector;
+class ColumnVectorsValueAccessor;
 class StorageManager;
 class ValueAccessor;
 
@@ -161,15 +162,10 @@ class AggregationHandleSum : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
+  AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
-#endif
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override;
 
   void aggregateValueAccessorIntoHashTable(
       ValueAccessor *accessor,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7ad8fa1..8733093 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -200,20 +200,6 @@ class QueryContext {
   }
 
   /**
-   * @brief Destroy the payloads from the aggregation hash tables.
-   *
-   * @warning After calling these methods, the hash table will be in an invalid
-   *          state. No other operation should be performed on them.
-   *
-   * @param id The ID of the AggregationOperationState.
-   **/
-  inline void destroyAggregationHashTablePayload(const aggregation_state_id id) {
-    DCHECK_LT(id, aggregation_states_.size());
-    DCHECK(aggregation_states_[id]);
-    aggregation_states_[id]->destroyAggregationHashTablePayload();
-  }
-
-  /**
    * @brief Whether the given GeneratorFunctionHandle id is valid.
    *
    * @param id The GeneratorFunctionHandle id.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 49be43d..62ca9e7 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -58,13 +58,6 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta
 }
 
 void DestroyAggregationStateWorkOrder::execute() {
-  // NOTE(harshad) : The destroyAggregationHashTablePayload call is separate
-  // from the destroyAggregationState call. The reason is that the aggregation
-  // hash tables don't own the AggregationHandle objects. However the hash table
-  // class requires the handles for destroying the payload (see the
-  // destroyPayload methods in AggregationHandle classes). Therefore, we first
-  // destroy the payloads in the hash table and then destroy the hash table.
-  query_context_->destroyAggregationHashTablePayload(aggr_state_index_);
   query_context_->destroyAggregationState(aggr_state_index_);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index b942c1b..6470f33 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -39,13 +39,14 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/FastHashTableFactory.hpp"
 #include "storage/HashTableBase.hpp"
-#include "storage/HashTableFactory.hpp"
 #include "storage/InsertDestination.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
 #include "storage/TupleIdSequence.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "types/TypedValue.hpp"
@@ -83,33 +84,42 @@ AggregationOperationState::AggregationOperationState(
       is_aggregate_partitioned_(checkAggregatePartitioned(
           estimated_num_entries, is_distinct, group_by, aggregate_functions)),
       predicate_(predicate),
-      group_by_list_(std::move(group_by)),
-      arguments_(std::move(arguments)),
       is_distinct_(std::move(is_distinct)),
       storage_manager_(storage_manager) {
   // Sanity checks: each aggregate has a corresponding list of arguments.
-  DCHECK(aggregate_functions.size() == arguments_.size());
+  DCHECK(aggregate_functions.size() == arguments.size());
 
   // Get the types of GROUP BY expressions for creating HashTables below.
-  std::vector<const Type *> group_by_types;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    group_by_types.emplace_back(&group_by_element->getType());
+  for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+    group_by_types_.emplace_back(&group_by_element->getType());
+  }
+
+  // Prepare group-by element attribute ids and non-trivial expressions.
+  for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
+    const attribute_id attr_id =
+        group_by_element->getAttributeIdForValueAccessor();
+    if (attr_id == kInvalidAttributeID) {
+      const attribute_id non_trivial_attr_id =
+          -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+      non_trivial_expressions_.emplace_back(group_by_element.release());
+      group_by_key_ids_.emplace_back(non_trivial_attr_id);
+    } else {
+      group_by_key_ids_.emplace_back(attr_id);
+    }
   }
 
   std::vector<AggregationHandle *> group_by_handles;
-  group_by_handles.clear();
 
   if (aggregate_functions.size() == 0) {
     // If there is no aggregation function, then it is a distinctify operation
     // on the group-by expressions.
-    DCHECK_GT(group_by_list_.size(), 0u);
+    DCHECK_GT(group_by_key_ids_.size(), 0u);
 
     handles_.emplace_back(new AggregationHandleDistinct());
-    arguments_.push_back({});
     is_distinct_.emplace_back(false);
     group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
                                                      hash_table_impl_type,
-                                                     group_by_types,
+                                                     group_by_types_,
                                                      {1},
                                                      handles_,
                                                      storage_manager));
@@ -117,8 +127,8 @@ AggregationOperationState::AggregationOperationState(
     // Set up each individual aggregate in this operation.
     std::vector<const AggregateFunction *>::const_iterator agg_func_it =
         aggregate_functions.begin();
-    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
-        args_it = arguments_.begin();
+    std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
+        args_it = arguments.begin();
     std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
     std::vector<HashTableImplType>::const_iterator
         distinctify_hash_table_impl_types_it =
@@ -133,6 +143,22 @@ AggregationOperationState::AggregationOperationState(
         argument_types.emplace_back(&argument->getType());
       }
 
+      // Prepare argument attribute ids and non-trivial expressions.
+      std::vector<attribute_id> argument_ids;
+      for (std::unique_ptr<const Scalar> &argument : *args_it) {
+        const attribute_id attr_id =
+            argument->getAttributeIdForValueAccessor();
+        if (attr_id == kInvalidAttributeID) {
+          const attribute_id non_trivial_attr_id =
+              -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+          non_trivial_expressions_.emplace_back(argument.release());
+          argument_ids.emplace_back(non_trivial_attr_id);
+        } else {
+          argument_ids.emplace_back(attr_id);
+        }
+      }
+      argument_ids_.emplace_back(std::move(argument_ids));
+
       // Sanity checks: aggregate function exists and can apply to the specified
       // arguments.
       DCHECK(*agg_func_it != nullptr);
@@ -142,7 +168,7 @@ AggregationOperationState::AggregationOperationState(
       // to do actual aggregate computation.
       handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
 
-      if (!group_by_list_.empty()) {
+      if (!group_by_key_ids_.empty()) {
         // Aggregation with GROUP BY: combined payload is partially updated in
         // the presence of DISTINCT.
         if (*is_distinct_it) {
@@ -153,35 +179,12 @@ AggregationOperationState::AggregationOperationState(
       } else {
         // Aggregation without GROUP BY: create a single global state.
         single_states_.emplace_back(handles_.back()->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-        // See if all of this aggregate's arguments are attributes in the input
-        // relation. If so, remember the attribute IDs so that we can do copy
-        // elision when actually performing the aggregation.
-        std::vector<attribute_id> local_arguments_as_attributes;
-        local_arguments_as_attributes.reserve(args_it->size());
-        for (const std::unique_ptr<const Scalar> &argument : *args_it) {
-          const attribute_id argument_id =
-              argument->getAttributeIdForValueAccessor();
-          if (argument_id == -1) {
-            local_arguments_as_attributes.clear();
-            break;
-          } else {
-            DCHECK_EQ(input_relation_.getID(),
-                      argument->getRelationIdForValueAccessor());
-            local_arguments_as_attributes.push_back(argument_id);
-          }
-        }
-
-        arguments_as_attributes_.emplace_back(
-            std::move(local_arguments_as_attributes));
-#endif
       }
 
       // Initialize the corresponding distinctify hash table if this is a
       // DISTINCT aggregation.
       if (*is_distinct_it) {
-        std::vector<const Type *> key_types(group_by_types);
+        std::vector<const Type *> key_types(group_by_types_);
         key_types.insert(
             key_types.end(), argument_types.begin(), argument_types.end());
         // TODO(jianqiao): estimated_num_entries is quite inaccurate for
@@ -203,12 +206,12 @@ AggregationOperationState::AggregationOperationState(
       }
     }
 
-    if (!group_by_handles.empty()) {
-      // Aggregation with GROUP BY: create a HashTable pool.
+    // Aggregation with GROUP BY: create a HashTable pool.
+    if (!group_by_key_ids_.empty()) {
       if (!is_aggregate_partitioned_) {
         group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
                                                          hash_table_impl_type,
-                                                         group_by_types,
+                                                         group_by_types_,
                                                          payload_sizes,
                                                          group_by_handles,
                                                          storage_manager));
@@ -217,7 +220,7 @@ AggregationOperationState::AggregationOperationState(
             new PartitionedHashTablePool(estimated_num_entries,
                                          FLAGS_num_aggregation_partitions,
                                          hash_table_impl_type,
-                                         group_by_types,
+                                         group_by_types_,
                                          payload_sizes,
                                          group_by_handles,
                                          storage_manager));
@@ -355,7 +358,7 @@ bool AggregationOperationState::ProtoIsValid(
 
 void AggregationOperationState::aggregateBlock(const block_id input_block,
                                                LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
-  if (group_by_list_.empty()) {
+  if (group_by_key_ids_.empty()) {
     aggregateBlockSingleState(input_block);
   } else {
     aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
@@ -364,7 +367,7 @@ void AggregationOperationState::aggregateBlock(const block_id input_block,
 
 void AggregationOperationState::finalizeAggregate(
     InsertDestination *output_destination) {
-  if (group_by_list_.empty()) {
+  if (group_by_key_ids_.empty()) {
     finalizeSingleState(output_destination);
   } else {
     finalizeHashTable(output_destination);
@@ -392,37 +395,53 @@ void AggregationOperationState::aggregateBlockSingleState(
 
   std::unique_ptr<TupleIdSequence> matches;
   if (predicate_ != nullptr) {
-    std::unique_ptr<ValueAccessor> accessor(
-        block->getTupleStorageSubBlock().createValueAccessor());
     matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
   }
 
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-    // If all arguments are attributes of the input relation, elide a copy.
-    if (!arguments_as_attributes_[agg_idx].empty()) {
-      local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
+  const auto &tuple_store = block->getTupleStorageSubBlock();
+  std::unique_ptr<ValueAccessor> accessor(
+      tuple_store.createValueAccessor(matches.get()));
+
+  ColumnVectorsValueAccessor non_trivial_results;
+  if (!non_trivial_expressions_.empty()) {
+    SubBlocksReference sub_blocks_ref(tuple_store,
+                                      block->getIndices(),
+                                      block->getIndicesConsistent());
+    for (const auto &expression : non_trivial_expressions_) {
+      non_trivial_results.addColumn(
+          expression->getAllValues(accessor.get(), &sub_blocks_ref));
     }
-#endif
+  }
+
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
+      LOG(FATAL) << "Distinct aggregation not supported";
       // Call StorageBlock::aggregateDistinct() to put the arguments as keys
       // directly into the (threadsafe) shared global distinctify HashTable
       // for this aggregate.
-      block->aggregateDistinct(*handles_[agg_idx],
-                               arguments_[agg_idx],
-                               local_arguments_as_attributes,
-                               {}, /* group_by */
-                               matches.get(),
-                               distinctify_hashtables_[agg_idx].get(),
-                               nullptr /* reuse_group_by_vectors */);
+//      block->aggregateDistinct(*handles_[agg_idx],
+//                               arguments_[agg_idx],
+//                               local_arguments_as_attributes,
+//                               {}, /* group_by */
+//                               matches.get(),
+//                               distinctify_hashtables_[agg_idx].get(),
+//                               nullptr /* reuse_group_by_vectors */);
+      // TODO(jianqiao): handle distinct
       local_state.emplace_back(nullptr);
     } else {
-      // Call StorageBlock::aggregate() to actually do the aggregation.
-      local_state.emplace_back(block->aggregate(*handles_[agg_idx],
-                                                arguments_[agg_idx],
-                                                local_arguments_as_attributes,
-                                                matches.get()));
+      const auto &argument_ids = argument_ids_[agg_idx];
+      const auto *handle = handles_[agg_idx];
+
+      AggregationState *state;
+      if (argument_ids.empty()) {
+        // Special case. This is a nullary aggregate (i.e. COUNT(*)).
+        state = handle->accumulateNullary(matches == nullptr ? tuple_store.numTuples()
+                                                             : matches->size());
+      } else {
+        // Have the AggregationHandle actually do the aggregation.
+        state = handle->accumulate(accessor.get(), &non_trivial_results, argument_ids);
+      }
+      local_state.emplace_back(state);
     }
   }
 
@@ -435,104 +454,107 @@ void AggregationOperationState::aggregateBlockHashTable(
     LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
   BlockReference block(
       storage_manager_->getBlock(input_block, input_relation_));
+  const auto &tuple_store = block->getTupleStorageSubBlock();
+  std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
+  std::unique_ptr<ValueAccessor> shared_accessor;
+  ValueAccessor *accessor = base_accessor.get();
 
   // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
   // as the existence map for the tuples.
   std::unique_ptr<TupleIdSequence> matches;
   if (predicate_ != nullptr) {
     matches.reset(block->getMatchesForPredicate(predicate_.get()));
+    shared_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+    accessor = shared_accessor.get();
   }
   if (lip_filter_adaptive_prober != nullptr) {
-    std::unique_ptr<ValueAccessor> accessor(
-        block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
-    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
+    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
+    shared_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+    accessor = shared_accessor.get();
   }
 
-  // This holds values of all the GROUP BY attributes so that the can be reused
-  // across multiple aggregates (i.e. we only pay the cost of evaluatin the
-  // GROUP BY expressions once).
-  std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
+  ColumnVectorsValueAccessor non_trivial_results;
+  if (!non_trivial_expressions_.empty()) {
+    SubBlocksReference sub_blocks_ref(tuple_store,
+                                      block->getIndices(),
+                                      block->getIndicesConsistent());
+    for (const auto &expression : non_trivial_expressions_) {
+      non_trivial_results.addColumn(
+          expression->getAllValues(accessor, &sub_blocks_ref));
+    }
+  }
 
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
+      LOG(FATAL) << "Distinct aggregation not supported";
       // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
       // expression
       // values and the aggregation arguments together as keys directly into the
       // (threadsafe) shared global distinctify HashTable for this aggregate.
-      block->aggregateDistinct(*handles_[agg_idx],
-                               arguments_[agg_idx],
-                               nullptr, /* arguments_as_attributes */
-                               group_by_list_,
-                               matches.get(),
-                               distinctify_hashtables_[agg_idx].get(),
-                               &reuse_group_by_vectors);
+//      block->aggregateDistinct(*handles_[agg_idx],
+//                               arguments_[agg_idx],
+//                               nullptr, /* arguments_as_attributes */
+//                               group_by_list_,
+//                               matches.get(),
+//                               distinctify_hashtables_[agg_idx].get(),
+//                               &reuse_group_by_vectors);
+      // TODO(jianqiao): handle distinct
     }
   }
 
   if (!is_aggregate_partitioned_) {
-    // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-    // directly into the (threadsafe) shared global HashTable for this
-    // aggregate.
     DCHECK(group_by_hashtable_pool_ != nullptr);
+
     AggregationStateHashTableBase *agg_hash_table =
-      group_by_hashtable_pool_->getHashTableFast();
+        group_by_hashtable_pool_->getHashTableFast();
     DCHECK(agg_hash_table != nullptr);
-    block->aggregateGroupBy(arguments_,
-                            group_by_list_,
-                            matches.get(),
-                            agg_hash_table,
-                            &reuse_group_by_vectors);
+
+    accessor->beginIterationVirtual();
+    agg_hash_table->upsertValueAccessorCompositeKeyFast(accessor,
+                                                        &non_trivial_results,
+                                                        argument_ids_,
+                                                        group_by_key_ids_,
+                                                        true /* check_for_null_keys */);
     group_by_hashtable_pool_->returnHashTable(agg_hash_table);
   } else {
-    ColumnVectorsValueAccessor temp_result;
-    // IDs of 'arguments' as attributes in the ValueAccessor we create below.
-    std::vector<attribute_id> argument_ids;
-
-    // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
-    std::vector<attribute_id> key_ids;
-    const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
-    block->aggregateGroupByPartitioned(
-        arguments_,
-        group_by_list_,
-        matches.get(),
-        num_partitions,
-        &temp_result,
-        &argument_ids,
-        &key_ids,
-        &reuse_group_by_vectors);
-    // Compute the partitions for the tuple formed by group by values.
-    std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
-    partition_membership.resize(num_partitions);
-
-    // Create a tuple-id sequence for each partition.
-    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
-      partition_membership[partition].reset(
-          new TupleIdSequence(temp_result.getEndPosition()));
-    }
-
-    // Iterate over ValueAccessor for each tuple,
-    // set a bit in the appropriate TupleIdSequence.
-    temp_result.beginIteration();
-    while (temp_result.next()) {
-      // We need a unique_ptr because getTupleWithAttributes() uses "new".
-      std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
-      const std::size_t curr_tuple_partition_id =
-          curr_tuple->getTupleHash() % num_partitions;
-      partition_membership[curr_tuple_partition_id]->set(
-          temp_result.getCurrentPosition(), true);
-    }
-    // For each partition, create an adapter around Value Accessor and
-    // TupleIdSequence.
-    std::vector<std::unique_ptr<
-        TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
-    adapter.resize(num_partitions);
-    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
-      adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
-          *(partition_membership)[partition]));
-      partitioned_group_by_hashtable_pool_->getHashTable(partition)
-          ->upsertValueAccessorCompositeKeyFast(
-              argument_ids, adapter[partition].get(), key_ids, true);
-    }
+    LOG(FATAL) << "Partitioned aggregation not supported";
+//    const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
+//
+//    // Compute the partitions for the tuple formed by group by values.
+//    std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+//    partition_membership.resize(num_partitions);
+//
+//    // Create a tuple-id sequence for each partition.
+//    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+//      partition_membership[partition].reset(
+//          new TupleIdSequence(temp_result.getEndPosition()));
+//    }
+//
+//    // Iterate over ValueAccessor for each tuple,
+//    // set a bit in the appropriate TupleIdSequence.
+//    temp_result.beginIteration();
+//    while (temp_result.next()) {
+//      // We need a unique_ptr because getTupleWithAttributes() uses "new".
+//      std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+//      const std::size_t curr_tuple_partition_id =
+//          curr_tuple->getTupleHash() % num_partitions;
+//      partition_membership[curr_tuple_partition_id]->set(
+//          temp_result.getCurrentPosition(), true);
+//    }
+//    // For each partition, create an adapter around Value Accessor and
+//    // TupleIdSequence.
+//    std::vector<std::unique_ptr<
+//        TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
+//    adapter.resize(num_partitions);
+//    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+//      adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
+//          *(partition_membership)[partition]));
+//      partitioned_group_by_hashtable_pool_->getHashTable(partition)
+//          ->upsertValueAccessorCompositeKeyFast(
+//              argument_ids, adapter[partition].get(), key_ids, true);
+//    }
   }
 }
 
@@ -577,59 +599,36 @@ void AggregationOperationState::finalizeHashTable(
   // e.g. Keep merging entries from smaller hash tables to larger.
 
   auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-  if (hash_tables->size() > 1) {
-    for (int hash_table_index = 0;
-         hash_table_index < static_cast<int>(hash_tables->size() - 1);
-         ++hash_table_index) {
-      // Merge each hash table to the last hash table.
-      mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
-                             hash_tables->back().get());
-    }
+  DCHECK(hash_tables != nullptr);
+  if (hash_tables->empty()) {
+    return;
+  }
+
+  std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+      hash_tables->back().release());
+  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+    std::unique_ptr<AggregationStateHashTableBase> hash_table(
+        hash_tables->at(i).release());
+    mergeGroupByHashTables(hash_table.get(), final_hash_table.get());
+    hash_table->destroyPayload();
   }
 
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
-      DCHECK(group_by_hashtable_pool_ != nullptr);
-      auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-      DCHECK(hash_tables != nullptr);
-      if (hash_tables->empty()) {
-        // We may have a case where hash_tables is empty, e.g. no input blocks.
-        // However for aggregateOnDistinctifyHashTableForGroupBy to work
-        // correctly, we should create an empty group by hash table.
-        AggregationStateHashTableBase *new_hash_table =
-            group_by_hashtable_pool_->getHashTableFast();
-        group_by_hashtable_pool_->returnHashTable(new_hash_table);
-        hash_tables = group_by_hashtable_pool_->getAllHashTables();
-      }
-      DCHECK(hash_tables->back() != nullptr);
-      AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
-      DCHECK(agg_hash_table != nullptr);
       handles_[agg_idx]->allowUpdate();
       handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
-          *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
+          *distinctify_hashtables_[agg_idx], final_hash_table.get(), agg_idx);
     }
 
-    auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    DCHECK(hash_tables != nullptr);
-    if (hash_tables->empty()) {
-      // We may have a case where hash_tables is empty, e.g. no input blocks.
-      // However for aggregateOnDistinctifyHashTableForGroupBy to work
-      // correctly, we should create an empty group by hash table.
-      AggregationStateHashTableBase *new_hash_table =
-          group_by_hashtable_pool_->getHashTableFast();
-      group_by_hashtable_pool_->returnHashTable(new_hash_table);
-      hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    }
-    AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
-    DCHECK(agg_hash_table != nullptr);
     ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *agg_hash_table, &group_by_keys, agg_idx);
+        *final_hash_table, &group_by_keys, agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
   }
+  final_hash_table->destroyPayload();
 
   // Reorganize 'group_by_keys' in column-major order so that we can make a
   // ColumnVectorsValueAccessor to bulk-insert results.
@@ -640,11 +639,10 @@ void AggregationOperationState::finalizeHashTable(
   // in a single HashTable.
   std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
   std::size_t group_by_element_idx = 0;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    const Type &group_by_type = group_by_element->getType();
-    if (NativeColumnVector::UsableForType(group_by_type)) {
+  for (const Type *group_by_type : group_by_types_) {
+    if (NativeColumnVector::UsableForType(*group_by_type)) {
       NativeColumnVector *element_cv =
-          new NativeColumnVector(group_by_type, group_by_keys.size());
+          new NativeColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
         element_cv->appendTypedValue(
@@ -652,7 +650,7 @@ void AggregationOperationState::finalizeHashTable(
       }
     } else {
       IndirectColumnVector *element_cv =
-          new IndirectColumnVector(group_by_type, group_by_keys.size());
+          new IndirectColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
         element_cv->appendTypedValue(
@@ -676,25 +674,6 @@ void AggregationOperationState::finalizeHashTable(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
-void AggregationOperationState::destroyAggregationHashTablePayload() {
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables =
-      nullptr;
-  if (!is_aggregate_partitioned_) {
-    if (group_by_hashtable_pool_ != nullptr) {
-      all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    }
-  } else {
-    if (partitioned_group_by_hashtable_pool_ != nullptr) {
-      all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
-    }
-  }
-  if (all_hash_tables != nullptr) {
-    for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) {
-      (*all_hash_tables)[ht_index]->destroyPayload();
-    }
-  }
-}
-
 void AggregationOperationState::finalizeAggregatePartitioned(
     const std::size_t partition_id, InsertDestination *output_destination) {
   // Each element of 'group_by_keys' is a vector of values for a particular
@@ -703,15 +682,16 @@ void AggregationOperationState::finalizeAggregatePartitioned(
 
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
+  AggregationStateHashTableBase *hash_table =
+      partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    AggregationStateHashTableBase *hash_table =
-        partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
     ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
         *hash_table, &group_by_keys, agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
   }
+  hash_table->destroyPayload();
 
   // Reorganize 'group_by_keys' in column-major order so that we can make a
   // ColumnVectorsValueAccessor to bulk-insert results.
@@ -722,16 +702,17 @@ void AggregationOperationState::finalizeAggregatePartitioned(
   // in a single HashTable.
   std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
   std::size_t group_by_element_idx = 0;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    const Type &group_by_type = group_by_element->getType();
-    if (NativeColumnVector::UsableForType(group_by_type)) {
-      NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+  for (const Type *group_by_type : group_by_types_) {
+    if (NativeColumnVector::UsableForType(*group_by_type)) {
+      NativeColumnVector *element_cv =
+          new NativeColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
         element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
       }
     } else {
-      IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+      IndirectColumnVector *element_cv =
+          new IndirectColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
         element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index e0826b0..9f2bd92 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,6 +33,7 @@
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
 #include "storage/PartitionedHashTablePool.hpp"
+#include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
@@ -46,6 +47,7 @@ class CatalogRelationSchema;
 class InsertDestination;
 class LIPFilterAdaptiveProber;
 class StorageManager;
+class TupleIdSequence;
 
 DECLARE_int32(num_aggregation_partitions);
 DECLARE_int32(partition_aggregation_num_groups_threshold);
@@ -178,11 +180,6 @@ class AggregationOperationState {
   void finalizeAggregate(InsertDestination *output_destination);
 
   /**
-   * @brief Destroy the payloads in the aggregation hash tables.
-   **/
-  void destroyAggregationHashTablePayload();
-
-  /**
    * @brief Generate the final results for the aggregates managed by this
    *        AggregationOperationState and write them out to StorageBlock(s).
    *        In this implementation, each thread picks a hash table belonging to
@@ -236,27 +233,28 @@ class AggregationOperationState {
       const std::vector<bool> &is_distinct,
       const std::vector<std::unique_ptr<const Scalar>> &group_by,
       const std::vector<const AggregateFunction *> &aggregate_functions) const {
-    // If there's no aggregation, return false.
-    if (aggregate_functions.empty()) {
-      return false;
-    }
-    // Check if there's a distinct operation involved in any aggregate, if so
-    // the aggregate can't be partitioned.
-    for (auto distinct : is_distinct) {
-      if (distinct) {
-        return false;
-      }
-    }
-    // There's no distinct aggregation involved, Check if there's at least one
-    // GROUP BY operation.
-    if (group_by.empty()) {
-      return false;
-    }
-    // There are GROUP BYs without DISTINCT. Check if the estimated number of
-    // groups is large enough to warrant a partitioned aggregation.
-    return estimated_num_groups >
-           static_cast<std::size_t>(
-               FLAGS_partition_aggregation_num_groups_threshold);
+//    // If there's no aggregation, return false.
+//    if (aggregate_functions.empty()) {
+//      return false;
+//    }
+//    // Check if there's a distinct operation involved in any aggregate, if so
+//    // the aggregate can't be partitioned.
+//    for (auto distinct : is_distinct) {
+//      if (distinct) {
+//        return false;
+//      }
+//    }
+//    // There's no distinct aggregation involved, Check if there's at least one
+//    // GROUP BY operation.
+//    if (group_by.empty()) {
+//      return false;
+//    }
+//    // There are GROUP BYs without DISTINCT. Check if the estimated number of
+//    // groups is large enough to warrant a partitioned aggregation.
+//    return estimated_num_groups >
+//           static_cast<std::size_t>(
+//               FLAGS_partition_aggregation_num_groups_threshold);
+    return false;
   }
 
   // Common state for all aggregates in this operation: the input relation, the
@@ -267,27 +265,27 @@ class AggregationOperationState {
   const bool is_aggregate_partitioned_;
 
   std::unique_ptr<const Predicate> predicate_;
-  std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
   // Each individual aggregate in this operation has an AggregationHandle and
-  // some number of Scalar arguments.
+  // zero (indicated by -1) or one argument.
   std::vector<AggregationHandle *> handles_;
-  std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
 
   // For each aggregate, whether DISTINCT should be applied to the aggregate's
   // arguments.
   std::vector<bool> is_distinct_;
 
+  // Non-trivial group-by/argument expressions that need to be evaluated.
+  std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
+
+  std::vector<attribute_id> group_by_key_ids_;
+  std::vector<std::vector<attribute_id>> argument_ids_;
+
+  std::vector<const Type *> group_by_types_;
+
   // Hash table for obtaining distinct (i.e. unique) arguments.
   std::vector<std::unique_ptr<AggregationStateHashTableBase>>
       distinctify_hashtables_;
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  // If all an aggregate's argument expressions are simply attributes in
-  // 'input_relation_', then this caches the attribute IDs of those arguments.
-  std::vector<std::vector<attribute_id>> arguments_as_attributes_;
-#endif
-
   // Per-aggregate global states for aggregation without GROUP BY.
   std::vector<std::unique_ptr<AggregationState>> single_states_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index 4a82a62..04e33bd 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -39,6 +39,7 @@
 #include "threading/SpinSharedMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "utility/HashPair.hpp"
 #include "utility/Macros.hpp"
 
@@ -407,109 +408,17 @@ class FastHashTable : public HashTableBase<resizable,
                               const std::uint8_t *init_value_ptr,
                               const std::uint8_t *source_state);
 
-  /**
-   * @brief Apply a functor to (multiple) entries in this hash table, with keys
-   *        drawn from a ValueAccessor. New values are first inserted if not
-   *        already present.
-   *
-   * @warning This method is only usable if allow_duplicate_keys is false.
-   * @warning This method is threadsafe with regard to other calls to upsert(),
-   *          upsertCompositeKey(), upsertValueAccessor(), and
-   *          upsertValueAccessorCompositeKey(), but should not be used
-   *          simultaneously with put(), putCompositeKey(), putValueAccessor(),
-   *          or putValueAccessorCompositeKey().
-   * @warning The ValueAccessor reference and ValueT* pointer passed to
-   *          functor's call operator are only guaranteed to be valid for the
-   *          duration of the call. The functor should not store a copy of
-   *          these pointers and assume that they remain valid.
-   * @warning Although this method itself is threadsafe, the ValueT object
-   *          accessed by functor is not guaranteed to be (although it is
-   *          guaranteed that its initial insertion will be atomic). If it is
-   *          possible for multiple threads to call upsertValueAccessor() with
-   *          the same key at the same time, then their access to ValueT should
-   *          be made threadsafe (e.g. with the use of atomic types, mutexes,
-   *          or some other external synchronization).
-   * @note This version is for single scalar keys, see also
-   *       upsertValueAccessorCompositeKey().
-   * @note If the hash table is (close to) full and resizable is true, this
-   *       routine might result in rebuilding the entire hash table.
-   *
-   * @param accessor A ValueAccessor which will be used to access keys.
-   *        beginIteration() should be called on accessor before calling this
-   *        method.
-   * @param key_attr_id The attribute ID of the keys to be read from accessor.
-   * @param check_for_null_keys If true, each key will be checked to see if it
-   *        is null before upserting it (null keys are skipped). This must be
-   *        set to true if some of the keys that will be read from accessor may
-   *        be null.
-   * @param functor A pointer to a functor, which should provide a call
-   *        operator that takes two arguments: const ValueAccessor& (or better
-   *        yet, a templated call operator which takes a const reference to
-   *        some subclass of ValueAccessor as its first argument) and ValueT*.
-   *        The call operator will be invoked once for every tuple with a
-   *        non-null key in accessor.
-   * @return True on success, false if upsert failed because there was not
-   *         enough space to insert new entries for all the keys in accessor
-   *         (note that some entries may still have been upserted, and
-   *         accessor's iteration will be left on the first tuple which could
-   *         not be inserted).
-   **/
   bool upsertValueAccessorFast(
-      const std::vector<attribute_id> &argument_ids,
       ValueAccessor *accessor,
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids,
       const attribute_id key_attr_id,
       const bool check_for_null_keys);
 
-  /**
-   * @brief Apply a functor to (multiple) entries in this hash table, with keys
-   *        drawn from a ValueAccessor. New values are first inserted if not
-   *        already present. Composite key version.
-   *
-   * @warning This method is only usable if allow_duplicate_keys is false.
-   * @warning This method is threadsafe with regard to other calls to upsert(),
-   *          upsertCompositeKey(), upsertValueAccessor(), and
-   *          upsertValueAccessorCompositeKey(), but should not be used
-   *          simultaneously with put(), putCompositeKey(), putValueAccessor(),
-   *          or putValueAccessorCompositeKey().
-   * @warning The ValueAccessor reference and ValueT* pointer passed to
-   *          functor's call operator are only guaranteed to be valid for the
-   *          duration of the call. The functor should not store a copy of
-   *          these pointers and assume that they remain valid.
-   * @warning Although this method itself is threadsafe, the ValueT object
-   *          accessed by functor is not guaranteed to be (although it is
-   *          guaranteed that its initial insertion will be atomic). If it is
-   *          possible for multiple threads to call upsertValueAccessor() with
-   *          the same key at the same time, then their access to ValueT should
-   *          be made threadsafe (e.g. with the use of atomic types, mutexes,
-   *          or some other external synchronization).
-   * @note This version is for composite keys, see also upsertValueAccessor().
-   * @note If the hash table is (close to) full and resizable is true, this
-   *       routine might result in rebuilding the entire hash table.
-   *
-   * @param accessor A ValueAccessor which will be used to access keys.
-   *        beginIteration() should be called on accessor before calling this
-   *        method.
-   * @param key_attr_ids The attribute IDs of each key component to be read
-   *        from accessor.
-   * @param check_for_null_keys If true, each key will be checked to see if it
-   *        is null before upserting it (null keys are skipped). This must be
-   *        set to true if some of the keys that will be read from accessor may
-   *        be null.
-   * @param functor A pointer to a functor, which should provide a call
-   *        operator that takes two arguments: const ValueAccessor& (or better
-   *        yet, a templated call operator which takes a const reference to
-   *        some subclass of ValueAccessor as its first argument) and ValueT*.
-   *        The call operator will be invoked once for every tuple with a
-   *        non-null key in accessor.
-   * @return True on success, false if upsert failed because there was not
-   *         enough space to insert new entries for all the keys in accessor
-   *         (note that some entries may still have been upserted, and
-   *         accessor's iteration will be left on the first tuple which could
-   *         not be inserted).
-   **/
   bool upsertValueAccessorCompositeKeyFast(
-      const std::vector<attribute_id> &argument,
       ValueAccessor *accessor,
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<std::vector<attribute_id>> &argument_ids,
       const std::vector<attribute_id> &key_attr_ids,
       const bool check_for_null_keys) override;
 
@@ -1779,84 +1688,87 @@ bool FastHashTable<resizable,
                    force_key_copy,
                    allow_duplicate_keys>::
     upsertValueAccessorFast(
-        const std::vector<attribute_id> &argument_ids,
         ValueAccessor *accessor,
+        ColumnVectorsValueAccessor *aux_accessor,
+        const std::vector<attribute_id> &argument_ids,
         const attribute_id key_attr_id,
         const bool check_for_null_keys) {
-  DEBUG_ASSERT(!allow_duplicate_keys);
-  std::size_t variable_size;
-  return InvokeOnAnyValueAccessor(
-      accessor,
-      [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
-        if (resizable) {
-          bool continuing = true;
-          while (continuing) {
-            {
-              continuing = false;
-              SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
-              while (accessor->next()) {
-                TypedValue key = accessor->getTypedValue(key_attr_id);
-                if (check_for_null_keys && key.isNull()) {
-                  continue;
-                }
-                variable_size = (force_key_copy && !scalar_key_inline_)
-                                    ? key.getDataSize()
-                                    : 0;
-                std::uint8_t *value =
-                    this->upsertInternalFast(key, variable_size, nullptr);
-                if (value == nullptr) {
-                  continuing = true;
-                  break;
-                } else {
-                  SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
-                  for (unsigned int k = 0; k < num_handles_; ++k) {
-                    if (argument_ids[k] != kInvalidAttributeID) {
-                      handles_[k]->updateStateUnary(
-                          accessor->getTypedValue(argument_ids[k]),
-                          value + payload_offsets_[k]);
-                    } else {
-                      handles_[k]->updateStateNullary(value +
-                                                      payload_offsets_[k]);
-                    }
-                  }
-                }
-              }
-            }
-            if (continuing) {
-              this->resize(0, variable_size);
-              accessor->previous();
-            }
-          }
-        } else {
-          while (accessor->next()) {
-            TypedValue key = accessor->getTypedValue(key_attr_id);
-            if (check_for_null_keys && key.isNull()) {
-              continue;
-            }
-            variable_size =
-                (force_key_copy && !scalar_key_inline_) ? key.getDataSize() : 0;
-            std::uint8_t *value =
-                this->upsertInternalFast(key, variable_size, nullptr);
-            if (value == nullptr) {
-              return false;
-            } else {
-              SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
-              for (unsigned int k = 0; k < num_handles_; ++k) {
-                if (argument_ids[k] != kInvalidAttributeID) {
-                  handles_[k]->updateStateUnary(
-                      accessor->getTypedValue(argument_ids[k]),
-                      value + payload_offsets_[k]);
-                } else {
-                  handles_[k]->updateStateNullary(value +
-                                                  payload_offsets_[k]);
-                }
-              }
-            }
-          }
-        }
-
-        return true;
-      });
+  LOG(FATAL) << "Not implemented";
+  return true;
+//  DEBUG_ASSERT(!allow_duplicate_keys);
+//  std::size_t variable_size;
+//  return InvokeOnAnyValueAccessor(
+//      accessor,
+//      [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
+//        if (resizable) {
+//          bool continuing = true;
+//          while (continuing) {
+//            {
+//              continuing = false;
+//              SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
+//              while (accessor->next()) {
+//                TypedValue key = accessor->getTypedValue(key_attr_id);
+//                if (check_for_null_keys && key.isNull()) {
+//                  continue;
+//                }
+//                variable_size = (force_key_copy && !scalar_key_inline_)
+//                                    ? key.getDataSize()
+//                                    : 0;
+//                std::uint8_t *value =
+//                    this->upsertInternalFast(key, variable_size, nullptr);
+//                if (value == nullptr) {
+//                  continuing = true;
+//                  break;
+//                } else {
+//                  SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+//                  for (unsigned int k = 0; k < num_handles_; ++k) {
+//                    if (argument_ids[k] != kInvalidAttributeID) {
+//                      handles_[k]->updateStateUnary(
+//                          accessor->getTypedValue(argument_ids[k]),
+//                          value + payload_offsets_[k]);
+//                    } else {
+//                      handles_[k]->updateStateNullary(value +
+//                                                      payload_offsets_[k]);
+//                    }
+//                  }
+//                }
+//              }
+//            }
+//            if (continuing) {
+//              this->resize(0, variable_size);
+//              accessor->previous();
+//            }
+//          }
+//        } else {
+//          while (accessor->next()) {
+//            TypedValue key = accessor->getTypedValue(key_attr_id);
+//            if (check_for_null_keys && key.isNull()) {
+//              continue;
+//            }
+//            variable_size =
+//                (force_key_copy && !scalar_key_inline_) ? key.getDataSize() : 0;
+//            std::uint8_t *value =
+//                this->upsertInternalFast(key, variable_size, nullptr);
+//            if (value == nullptr) {
+//              return false;
+//            } else {
+//              SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+//              for (unsigned int k = 0; k < num_handles_; ++k) {
+//                if (argument_ids[k] != kInvalidAttributeID) {
+//                  handles_[k]->updateStateUnary(
+//                      accessor->getTypedValue(argument_ids[k]),
+//                      value + payload_offsets_[k]);
+//                } else {
+//                  handles_[k]->updateStateNullary(value +
+//                                                  payload_offsets_[k]);
+//                }
+//              }
+//            }
+//          }
+//        }
+//
+//        return true;
+//      });
 }
 
 template <bool resizable,
@@ -1868,90 +1780,70 @@ bool FastHashTable<resizable,
                    force_key_copy,
                    allow_duplicate_keys>::
     upsertValueAccessorCompositeKeyFast(
-        const std::vector<attribute_id> &argument_ids,
         ValueAccessor *accessor,
+        ColumnVectorsValueAccessor *aux_accessor,
+        const std::vector<std::vector<attribute_id>> &argument_ids,
         const std::vector<attribute_id> &key_attr_ids,
         const bool check_for_null_keys) {
+  DEBUG_ASSERT(resizable);
   DEBUG_ASSERT(!allow_duplicate_keys);
+
   std::size_t variable_size;
   std::vector<TypedValue> key_vector;
   key_vector.resize(key_attr_ids.size());
   return InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
-        if (resizable) {
-          bool continuing = true;
-          while (continuing) {
-            {
-              continuing = false;
-              SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
-              while (accessor->next()) {
-                if (this->GetCompositeKeyFromValueAccessor(*accessor,
-                                                           key_attr_ids,
-                                                           check_for_null_keys,
-                                                           &key_vector)) {
-                  continue;
-                }
-                variable_size =
-                    this->calculateVariableLengthCompositeKeyCopySize(
-                        key_vector);
-                std::uint8_t *value = this->upsertCompositeKeyInternalFast(
-                    key_vector, nullptr, variable_size);
-                if (value == nullptr) {
-                  continuing = true;
-                  break;
-                } else {
-                  SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
-                  for (unsigned int k = 0; k < num_handles_; ++k) {
-                    if (argument_ids[k] != kInvalidAttributeID) {
-                      handles_[k]->updateStateUnary(
-                          accessor->getTypedValue(argument_ids[k]),
-                          value + payload_offsets_[k]);
-                    } else {
-                      handles_[k]->updateStateNullary(value +
-                                                      payload_offsets_[k]);
-                    }
-                  }
-                }
-              }
-            }
-            if (continuing) {
-              this->resize(0, variable_size);
-              accessor->previous();
-            }
+    bool continuing = true;
+    while (continuing) {
+      {
+        continuing = false;
+        SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
+        while (accessor->next()) {
+          aux_accessor->next();
+          // TODO(jianqiao): templatize to involve aux_accessor
+          if (this->GetCompositeKeyFromValueAccessor(*accessor,
+                                                     key_attr_ids,
+                                                     check_for_null_keys,
+                                                     &key_vector)) {
+            continue;
           }
-        } else {
-          while (accessor->next()) {
-            if (this->GetCompositeKeyFromValueAccessor(*accessor,
-                                                       key_attr_ids,
-                                                       check_for_null_keys,
-                                                       &key_vector)) {
-              continue;
-            }
-            variable_size =
-                this->calculateVariableLengthCompositeKeyCopySize(key_vector);
-            std::uint8_t *value = this->upsertCompositeKeyInternalFast(
-                key_vector, nullptr, variable_size);
-            if (value == nullptr) {
-              return false;
-            } else {
-              SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
-              for (unsigned int k = 0; k < num_handles_; ++k) {
-                if (argument_ids[k] != kInvalidAttributeID) {
+          variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+          std::uint8_t *value = this->upsertCompositeKeyInternalFast(
+              key_vector, nullptr, variable_size);
+          if (value == nullptr) {
+            continuing = true;
+            break;
+          } else {
+            SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+            for (unsigned int k = 0; k < num_handles_; ++k) {
+              const auto &ids = argument_ids[k];
+              if (ids.empty()) {
+                handles_[k]->updateStateNullary(value +
+                                                payload_offsets_[k]);
+              } else {
+                const attribute_id argument_id = ids.front();
+                if (argument_id >= 0) {
                   handles_[k]->updateStateUnary(
-                      accessor->getTypedValue(argument_ids[k]),
+                      accessor->getTypedValue(argument_id),
                       value + payload_offsets_[k]);
                 } else {
-                  handles_[k]->updateStateNullary(value +
-                                                  payload_offsets_[k]);
+                  handles_[k]->updateStateUnary(
+                      aux_accessor->getTypedValue(-(argument_id+2)),
+                      value + payload_offsets_[k]);
                 }
               }
             }
           }
         }
-
-        return true;
-      });
+      }
+      if (continuing) {
+        this->resize(0, variable_size);
+        accessor->previous();
+      }
+    }
+    return true;
+  });
 }
 
 template <bool resizable,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index a3180bb..e3b8dbd 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -23,11 +23,13 @@
 #include <cstddef>
 #include <vector>
 
-#include "ValueAccessor.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
+class ColumnVectorsValueAccessor;
+class ValueAccessor;
+
 /** \addtogroup Storage
  *  @{
  */
@@ -92,8 +94,9 @@ class HashTableBase {
    * specialization from this file.
    **/
   virtual bool upsertValueAccessorCompositeKeyFast(
-      const std::vector<attribute_id> &argument,
       ValueAccessor *accessor,
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<std::vector<attribute_id>> &argument_ids,
       const std::vector<attribute_id> &key_attr_ids,
       const bool check_for_null_keys) {
     return false;