You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/13 21:26:54 UTC

incubator-quickstep git commit: Removed finalize()

Repository: incubator-quickstep
Updated Branches:
  refs/heads/SQL-window-aggregation c50ce5139 -> 0f5c41d15


Removed finalize()


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0f5c41d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0f5c41d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0f5c41d1

Branch: refs/heads/SQL-window-aggregation
Commit: 0f5c41d1531032065e747b642b9013802f74ac06
Parents: c50ce51
Author: shixuan-fan <sh...@apache.org>
Authored: Wed Jul 13 21:26:29 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Wed Jul 13 21:26:29 2016 +0000

----------------------------------------------------------------------
 .../WindowAggregationHandle.hpp                 |  16 +-
 .../WindowAggregationHandleAvg.cpp              |  47 +-
 .../WindowAggregationHandleAvg.hpp              |  18 +-
 .../WindowAggregationHandleAvg_unittest.cpp     | 600 +++----------------
 storage/WindowAggregationOperationState.cpp     |  19 +-
 5 files changed, 140 insertions(+), 560 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 8511b9e..831bcbf 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -102,14 +102,12 @@ class WindowAggregationHandle {
    *                          NULL if all arguments are attributes.
    * @param output_destination The destination for output.
    **/
-  virtual void calculate(ColumnVectorsValueAccessor* block_accessors,
-                         std::vector<ColumnVector*> &&arguments,
-                         const std::vector<attribute_id> &partition_by_ids,
-                         const bool is_row,
-                         const std::int64_t num_preceding,
-                         const std::int64_t num_following) = 0;
-
-  virtual ValueAccessor* finalize() = 0;
+  virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
+                                  std::vector<ColumnVector*> &&arguments,
+                                  const std::vector<attribute_id> &partition_by_ids,
+                                  const bool is_row,
+                                  const std::int64_t num_preceding,
+                                  const std::int64_t num_following) = 0;
 
  protected:
   /**
@@ -133,8 +131,6 @@ class WindowAggregationHandle {
     }
   }
 
-  std::unique_ptr<ColumnVectorsValueAccessor> tuple_accessor_;
-  std::unique_ptr<NativeColumnVector> window_aggregates_;
   const CatalogRelationSchema &relation_;
   std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index 7daaddf..14fc1d9 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -88,22 +88,21 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
           .makeUncheckedBinaryOperatorForTypes(*sum_type_, TypeFactory::GetType(kDouble)));
 }
 
-void WindowAggregationHandleAvg::calculate(ColumnVectorsValueAccessor *tuple_accessor,
-                                           std::vector<ColumnVector*> &&arguments,
-                                           const std::vector<attribute_id> &partition_by_ids,
-                                           const bool is_row,
-                                           const std::int64_t num_preceding,
-                                           const std::int64_t num_following) {
+ColumnVector* WindowAggregationHandleAvg::calculate(
+    ColumnVectorsValueAccessor *tuple_accessor,
+    std::vector<ColumnVector*> &&arguments,
+    const std::vector<attribute_id> &partition_by_ids,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) {
   DCHECK(arguments.size() == 1);
   DCHECK(arguments[0]->isNative());
   DCHECK(static_cast<std::size_t>(tuple_accessor->getNumTuples()) ==
          static_cast<const NativeColumnVector*>(arguments[0])->size());
-
-  tuple_accessor_.reset(tuple_accessor);
   
   // Initialize the output column and argument accessor.
-  window_aggregates_.reset(
-      new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples()));
+  NativeColumnVector *window_aggregates =
+      new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples());
   ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor();
   argument_accessor->addColumn(arguments[0]);
   
@@ -111,22 +110,21 @@ void WindowAggregationHandleAvg::calculate(ColumnVectorsValueAccessor *tuple_acc
   tuple_accessor->beginIteration();
   argument_accessor->beginIteration();
       
-  while (tuple_accessor_->next() && argument_accessor->next()) {
-    const TypedValue window_aggregate = this->calculateOneWindow(argument_accessor,
+  while (tuple_accessor->next() && argument_accessor->next()) {
+    const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor,
+                                                                 argument_accessor,
                                                                  partition_by_ids,
                                                                  is_row,
                                                                  num_preceding,
                                                                  num_following);
-    window_aggregates_->appendTypedValue(window_aggregate);
+    window_aggregates->appendTypedValue(window_aggregate);
   }
-}
 
-ValueAccessor* WindowAggregationHandleAvg::finalize() {
-  tuple_accessor_->addColumn(window_aggregates_.release());
-  return tuple_accessor_.get();
+  return window_aggregates;
 }
 
 TypedValue WindowAggregationHandleAvg::calculateOneWindow(
+    ColumnVectorsValueAccessor *tuple_accessor,
     ColumnVectorsValueAccessor *argument_accessor,
     const std::vector<attribute_id> &partition_by_ids,
     const bool is_row,
@@ -149,11 +147,11 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow(
   std::vector<TypedValue> current_row_partition_key;
   for (attribute_id partition_by_id : partition_by_ids) {
     current_row_partition_key.push_back(
-        tuple_accessor_->getTypedValue(partition_by_id));
+        tuple_accessor->getTypedValue(partition_by_id));
   }
 
   // Get current position.
-  tuple_id current_tuple_id = tuple_accessor_->getCurrentPositionVirtual();
+  tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
   
   // Find preceding tuples.
   int count_preceding = 0;
@@ -168,7 +166,8 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow(
 
     // Get the partition keys and compare. If not the same partition as the
     // current row, stop searching preceding tuples.
-    if (!samePartition(current_row_partition_key,
+    if (!samePartition(tuple_accessor,
+                       current_row_partition_key,
                        preceding_tuple_id,
                        partition_by_ids)) {
       break;
@@ -196,13 +195,14 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow(
     following_tuple_id++;
 
     // No more following tuples.
-    if (following_tuple_id == tuple_accessor_->getNumTuples()) {
+    if (following_tuple_id == tuple_accessor->getNumTuples()) {
       break;
     }
 
     // Get the partition keys and compare. If not the same partition as the
     // current row, stop searching preceding tuples.
-    if (!samePartition(current_row_partition_key,
+    if (!samePartition(tuple_accessor,
+                       current_row_partition_key,
                        following_tuple_id,
                        partition_by_ids)) {
       break;
@@ -229,6 +229,7 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow(
 }
 
 bool WindowAggregationHandleAvg::samePartition(
+    const ColumnVectorsValueAccessor *tuple_accessor,
     const std::vector<TypedValue> &current_row_partition_key,
     const tuple_id boundary_tuple_id,
     const std::vector<attribute_id> &partition_by_ids) const {
@@ -237,7 +238,7 @@ bool WindowAggregationHandleAvg::samePartition(
        ++partition_by_index) {
     if (!equal_comparators_[partition_by_index]->compareTypedValues(
             current_row_partition_key[partition_by_index],
-            tuple_accessor_->getTypedValueAtAbsolutePosition(
+            tuple_accessor->getTypedValueAtAbsolutePosition(
                 partition_by_ids[partition_by_index], boundary_tuple_id))) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 4eb0846..72076fa 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -55,14 +55,12 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
  public:
   ~WindowAggregationHandleAvg() override {}
 
-  void calculate(ColumnVectorsValueAccessor* block_accessors,
-                 std::vector<ColumnVector*> &&arguments,
-                 const std::vector<attribute_id> &partition_by_ids,
-                 const bool is_row,
-                 const std::int64_t num_preceding,
-                 const std::int64_t num_following);
-
-  ValueAccessor* finalize() override;
+  ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
+                          std::vector<ColumnVector*> &&arguments,
+                          const std::vector<attribute_id> &partition_by_ids,
+                          const bool is_row,
+                          const std::int64_t num_preceding,
+                          const std::int64_t num_following);
 
  private:
   friend class WindowAggregateFunctionAvg;
@@ -83,13 +81,15 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
                                       const Type &type);
 
   TypedValue calculateOneWindow(
+      ColumnVectorsValueAccessor *tuple_accessor,
       ColumnVectorsValueAccessor *argument_accessor,
       const std::vector<attribute_id> &partition_by_ids,
       const bool is_row,
       const std::int64_t num_preceding,
       const std::int64_t num_following) const;
 
-  bool samePartition(const std::vector<TypedValue> &current_row_partition_key,
+  bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
+                     const std::vector<TypedValue> &current_row_partition_key,
                      const tuple_id boundary_tuple_id,
                      const std::vector<attribute_id> &partition_by_ids) const;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index 8fd3c8a..58c8019 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -28,7 +28,6 @@
 #include "expressions/window_aggregation/WindowAggregationHandle.hpp"
 #include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
 #include "expressions/window_aggregation/WindowAggregationID.hpp"
-#include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DateOperatorOverloads.hpp"
 #include "types/DatetimeIntervalType.hpp"
@@ -51,141 +50,15 @@ namespace quickstep {
 
 namespace {
 
-  constexpr int kNumTuplesPerBlock = 100;
-  constexpr int kNumBlocks = 5;
+  constexpr int kNumTuples = 100;
   constexpr int kNumTuplesPerPartition = 8;
+  constexpr int kNullInterval = 25;
 
 }  // namespace
 
 // Attribute value could be null if set true.
 class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
- protected:
-  virtual void SetUp() {
-    // Initialize relation and storage manager.
-    relation_.reset(new CatalogRelation(NULL, "TestRelation", kRelationId));
-    storage_manager_.reset(new StorageManager("TestAvg"));
-
-    // Add All kinds of TypedValues.
-    CatalogAttribute *int_attr = new CatalogAttribute(relation_.get(),
-                                                      "int_attr",
-                                                      TypeFactory::GetType(kInt, GetParam()));
-
-    relation_->addAttribute(int_attr);
-
-    CatalogAttribute *float_attr = new CatalogAttribute(relation_.get(),
-                                                        "float_attr",
-                                                        TypeFactory::GetType(kFloat, GetParam()));
-    relation_->addAttribute(float_attr);
-
-    CatalogAttribute *long_attr = new CatalogAttribute(relation_.get(),
-                                                       "long_attr",
-                                                       TypeFactory::GetType(kLong, GetParam()));
-    relation_->addAttribute(long_attr);
-
-    CatalogAttribute *double_attr = new CatalogAttribute(relation_.get(),
-                                                         "double_attr",
-                                                         TypeFactory::GetType(kDouble, GetParam()));
-    relation_->addAttribute(double_attr);
-
-    CatalogAttribute *char_attr = new CatalogAttribute(relation_.get(),
-                                                       "char_attr",
-                                                       TypeFactory::GetType(kChar, 4, GetParam()));
-    relation_->addAttribute(char_attr);
-
-    CatalogAttribute *varchar_attr = new CatalogAttribute(relation_.get(),
-                                                          "varchar_attr",
-                                                          TypeFactory::GetType(kVarChar, 32, GetParam()));
-    relation_->addAttribute(varchar_attr);
-    
-    // Records the 'base_value' of a tuple used in createSampleTuple.
-    CatalogAttribute *partition_value = new CatalogAttribute(relation_.get(),
-                                                             "partition_value",
-                                                             TypeFactory::GetType(kInt, false));
-    relation_->addAttribute(partition_value);
-
-    StorageBlockLayout *layout = StorageBlockLayout::GenerateDefaultLayout(*relation_, true);
-
-    // Initialize blocks.
-    for (int i = 0; i < kNumBlocks; ++i) {
-      block_id bid = storage_manager_->createBlock(relation_, layout);
-      relation_->addBlock(bid);
-      insertTuples(bid);
-    }
-  }
-
-  // Insert kNumTuplesPerBlock tuples into the block.
-  void insertTuples(block_id bid) {
-    MutableBlockReference block = storage_manager_->getBlockMutable(bid, relation_);
-    for (int i = 0; i < kNumTuplesPerBlock; ++i) {
-      Tuple *tuple = createTuple(bid * kNumTuplesPerBlock + i);
-      block->insertTuple(*tuple);
-    }
-  }
-
-  Tuple* createTuple(int base_value) {
-    std::vector<TypedValue> attrs;
-
-    // int_attr.
-    if (GetParam() && base_value % 10 == 0) {
-      // Throw in a NULL integer for every ten values.
-      attrs.emplace_back(kInt);
-    } else {
-      attrs.emplace_back(base_value);
-    }
-
-    // float_attr.
-    if (GetParam() && base_value % 10 == 1) {
-      attrs.emplace_back(kFloat);
-    } else {
-      attrs.emplace_back(static_cast<float>(0.4 * base_value));
-    }
-
-    // long_attr.
-    if (GetParam() && base_value % 10 == 2) {
-      attrs.emplace_back(kLong);
-    } else {
-      attrs.emplace_back(static_cast<std::int64_t>(base_value));
-    }
-
-    // double_attr.
-    if (GetParam() && base_value % 10 == 3) {
-      attrs.emplace_back(kDouble);
-    } else {
-      attrs.emplace_back(static_cast<double>(0.25 * base_value));
-    }
-
-    // char_attr
-    if (GetParam() && base_value % 10 == 4) {
-      attrs.emplace_back(CharType::InstanceNullable(4).makeNullValue());
-    } else {
-      std::ostringstream char_buffer;
-      char_buffer << base_value;
-      std::string string_literal(char_buffer.str());
-      attrs.emplace_back(CharType::InstanceNonNullable(4).makeValue(
-          string_literal.c_str(),
-          string_literal.size() > 3 ? 4
-                                    : string_literal.size() + 1));
-      attrs.back().ensureNotReference();
-    }
-
-    // varchar_attr
-    if (GetParam() && base_value % 10 == 5) {
-      attrs.emplace_back(VarCharType::InstanceNullable(32).makeNullValue());
-    } else {
-      std::ostringstream char_buffer;
-      char_buffer << "Here are some numbers: " << base_value;
-      std::string string_literal(char_buffer.str());
-      attrs.emplace_back(VarCharType::InstanceNonNullable(32).makeValue(
-          string_literal.c_str(),
-          string_literal.size() + 1));
-      attrs.back().ensureNotReference();
-    }
-
-    // base_value
-    attrs.emplace_back(base_value / kNumTuplesPerPartition);
-    return new Tuple(std::move(attrs));
-  }
-  
+ protected: 
   // Handle initialization.
   void initializeHandle(const Type &argument_type,
                         const std::vector<const Type*> &partition_key_types) {
@@ -217,14 +90,18 @@ class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
 
   template <typename CppType>
   static void CheckAvgValues(
-      std::vector<CppType> expected,
+      std::vector<CppType*> expected,
       const ColumnVector *actual) {
     EXPECT_TRUE(actual->isNative());
     NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
 
     EXPECT_EQ(expected.size(), actual->size());
     for (std::size_t i = 0; i < expected.size(); ++i) {
-      EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral<CppType>());
+      if (expected[i] == nullptr) {
+        EXPECT_TRUE(actual->getTypedValue(i).isNull());
+      } else {
+        EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral<CppType>());
+      }
     }
   }
 
@@ -238,405 +115,110 @@ class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
   void checkAggregationAvgGeneric() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
-    EXPECT_TRUE(aggregation_handle_avg_->finalize(relation_, storage_manager_).empty());
-
-    aggregation_handle_avg_->calculate(relation_.getBlocksSnapshot(),
-                                       std::vector<GenericType
-                                       
+    EXPECT_EQ(0, aggregation_handle_avg_->finalize()->getNumTuplesVirtual());
+
+    // Create argument, partition key and cpptype vectors.
+    std::vector<GenericType::cpptype*> argument_cpp_vector;
+    argument_cpp_vector.reserve(kNumTuples);
+    ColumnVector *argument_type_vector =
+        createArgumentGeneric<GenericType>(&argument_cpp_vector);
+    const IntType &int_type = ;
+    NativeColumnVector *partition_key_vector =
+      new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+        
+    for (int i = 0; i < kNumTuples; ++i) {
+      partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+    }
 
-    std::vector<OutputType> result_vector;
-    typename GenericType::cpptype val;
-    typename GenericType::cpptype sum;
-    SetDataType(0, &sum);
+    // Create tuple ValueAccessor
+    ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
+    tuple_accessor->addColumn(partition_key_vector);
+    tuple_accessor->addColumn(argument_type_vector);
 
-    for (int i = 0; i < kNumSamples; ++i) {
-      if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
-        SetDataType(i - 10, &val);
-      } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
-      }
-      iterateHandle(aggregation_handle_avg_state_.get(), type.makeValue(&val));
-      sum += val;
-    }
-    iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
-    CheckAvgValue<typename OutputType::cpptype>(static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
-                                                *aggregation_handle_avg_,
-                                                *aggregation_handle_avg_state_);
+    // Test UNBOUNDED PRECEDING AND CURRENT ROW.
+    calculateAccumulative<GenericType, OutputType>(tuple_accessor,
+                                                   argument_type_vector,
+                                                   argument_cpp_vector);
   }
 
   template <typename GenericType>
-  ColumnVector *createColumnVectorGeneric(const Type &type, typename GenericType::cpptype *sum) {
-    NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
-
-    typename GenericType::cpptype val;
-    SetDataType(0, sum);
+  ColumnVector *createArgumentGeneric(
+      std::vector<GenericType::cpptype*> *argument_cpp_vector) {
+    const GenericType &type = GenericType::Instance(true);
+    NativeColumnVector *column = new NativeColumnVector(type, kNumSamples);
 
     column->appendTypedValue(type.makeNullValue());
     for (int i = 0; i < kNumSamples; ++i) {
+      // Insert a NULL every kNullInterval tuples.
+      if (i % kNullInterval == 0) {
+        argument_cpp_vector->push_back(nullptr);
+        column->appendTypedValue(type.makeNullValue());
+        continue;
+      }
+
+      typename GenericType::cpptype val = new GenericType::cpptype;
+      
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
-        SetDataType(i - 10, &val);
+        SetDataType(i - 10, val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
-      }
-      column->appendTypedValue(type.makeValue(&val));
-      *sum += val;
-      // One NULL in the middle.
-      if (i == kNumSamples/2) {
-        column->appendTypedValue(type.makeNullValue());
+        SetDataType(static_cast<float>(i - 10) / 10, val);
       }
+      
+      column->appendTypedValue(type.makeValue(val));
+      argument_cpp_vector->push_back(val);
     }
-    column->appendTypedValue(type.makeNullValue());
 
     return column;
   }
 
-  template <typename GenericType, typename OutputType = DoubleType>
-  void checkAggregationAvgGenericColumnVector() {
-    const GenericType &type = GenericType::Instance(true);
-    initializeHandle(type);
-    EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
-
-    typename GenericType::cpptype sum;
-    SetDataType(0, &sum);
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(createColumnVectorGeneric<GenericType>(type, &sum));
-
-    std::unique_ptr<AggregationState> cv_state(
-        aggregation_handle_avg_->accumulateColumnVectors(column_vectors));
-
-    // Test the state generated directly by accumulateColumnVectors(), and also
-    // test after merging back.
-    CheckAvgValue<typename OutputType::cpptype>(
-        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
-        *aggregation_handle_avg_,
-        *cv_state);
-
-    aggregation_handle_avg_->mergeStates(*cv_state, aggregation_handle_avg_state_.get());
-    CheckAvgValue<typename OutputType::cpptype>(
-        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
-        *aggregation_handle_avg_,
-        *aggregation_handle_avg_state_);
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  template <typename GenericType, typename OutputType = DoubleType>
-  void checkAggregationAvgGenericValueAccessor() {
-    const GenericType &type = GenericType::Instance(true);
-    initializeHandle(type);
-    EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
-
+  template <typename GenericType, typename OutputType>
+  void calculateAccumulate(ValueAccessor *tuple_accessor,
+                           ColumnVector *argument_type_vector,
+                           const std::vector<GenericType::cpptype> &argument_cpp_vector) {
+    std::vector<ColumnVector*> arguments;
+    arguments.push_back(argument_type_vector);
+    // The partition key index is 0.
+    std::vector<attribute_id> partition_key(1, 0);
+    
+    ColumnVector *result =
+        handle_avg_->calculate(tuple_accessor,
+                               std::move(arguments),
+                               partition_key,
+                               true  /* is_row */,
+                               -1  /* num_preceding: UNBOUNDED PRECEDING */,
+                               0  /* num_following: CURRENT ROW */);
+                           
+    // Get the cpptype result.
+    std::vector<OutputType::cpptype*> result_cpp_vector;
+    bool is_null;
     typename GenericType::cpptype sum;
-    SetDataType(0, &sum);
-    std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor());
-    accessor->addColumn(createColumnVectorGeneric<GenericType>(type, &sum));
+    int count;
+    for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+      // Start of new partition
+      if (i % kNumTuplesPerPartition == 0) {
+        is_null = false;
+        SetDataType(0, &sum);
+        count = 0;
+      }
 
-    std::unique_ptr<AggregationState> va_state(
-        aggregation_handle_avg_->accumulateValueAccessor(accessor.get(),
-                                                         std::vector<attribute_id>(1, 0)));
+      typename GenericType::cpptype *value = argument_cpp_vector[i];
+      if (value == nullptr) {
+        is_null = true;
+      }
 
-    // Test the state generated directly by accumulateValueAccessor(), and also
-    // test after merging back.
-    CheckAvgValue<typename OutputType::cpptype>(
-        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
-        *aggregation_handle_avg_,
-        *va_state);
+      if (is_null) {
+        result_cpp_vector.push_back(nullptr);
+      } else {
+        sum += *value;
+        count++;
+        result_cpp_vector.push_back(static_cast<typename OutputType>(sum) / count);
+      }
+    }
 
-    aggregation_handle_avg_->mergeStates(*va_state, aggregation_handle_avg_state_.get());
-    CheckAvgValue<typename OutputType::cpptype>(
-        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
-        *aggregation_handle_avg_,
-        *aggregation_handle_avg_state_);
+    CheckAvgValues(result_cpp_vector, result);
   }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
-  std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
-  std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
-  std::unique_ptr<StorageManager> storage_manager_;
-  std::unique_ptr<CatalogRelation> relation_;
+  std::unique_ptr<WindowAggregationHandle> handle_avg_;
 };
 
-const int AggregationHandleAvgTest::kNumSamples;
-
-template <>
-void AggregationHandleAvgTest::CheckAvgValue<double>(
-    double expected,
-    const AggregationHandle &handle,
-    const AggregationState &state) {
-  EXPECT_DOUBLE_EQ(expected, handle.finalize(state).getLiteral<double>());
-}
-
-template <>
-void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) {
-  data->interval_ticks = value;
-}
-
-template <>
-void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) {
-  data->months = value;
-}
-
-typedef AggregationHandleAvgTest AggregationHandleAvgDeathTest;
-
-TEST_F(AggregationHandleAvgTest, IntTypeTest) {
-  checkAggregationAvgGeneric<IntType>();
-}
-
-TEST_F(AggregationHandleAvgTest, LongTypeTest) {
-  checkAggregationAvgGeneric<LongType>();
-}
-
-TEST_F(AggregationHandleAvgTest, FloatTypeTest) {
-  checkAggregationAvgGeneric<FloatType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DoubleTypeTest) {
-  checkAggregationAvgGeneric<DoubleType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeTest) {
-  checkAggregationAvgGeneric<DatetimeIntervalType, DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeTest) {
-  checkAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
-}
-
-TEST_F(AggregationHandleAvgTest, IntTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<IntType>();
-}
-
-TEST_F(AggregationHandleAvgTest, LongTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<LongType>();
-}
-
-TEST_F(AggregationHandleAvgTest, FloatTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<FloatType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<DoubleType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>();
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-TEST_F(AggregationHandleAvgTest, IntTypeValueAccessorTest) {
-  checkAggregationAvgGenericValueAccessor<IntType>();
-}
-
-TEST_F(AggregationHandleAvgTest, LongTypeValueAccessorTest) {
-  checkAggregationAvgGenericValueAccessor<LongType>();
-}
-
-TEST_F(AggregationHandleAvgTest, FloatTypeValueAccessorTest) {
-  checkAggregationAvgGenericValueAccessor<FloatType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DoubleTypeValueAccessorTest) {
-  checkAggregationAvgGenericValueAccessor<DoubleType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeValueAccessorTest) {
-  checkAggregationAvgGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) {
-  checkAggregationAvgGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>();
-}
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-#ifdef QUICKSTEP_DEBUG
-TEST_F(AggregationHandleAvgDeathTest, CharTypeTest) {
-  const Type &type = CharType::Instance(true, 10);
-  EXPECT_DEATH(initializeHandle(type), "");
-}
-
-TEST_F(AggregationHandleAvgDeathTest, VarTypeTest) {
-  const Type &type = VarCharType::Instance(true, 10);
-  EXPECT_DEATH(initializeHandle(type), "");
-}
-
-TEST_F(AggregationHandleAvgDeathTest, WrongTypeTest) {
-  const Type &int_non_null_type = IntType::Instance(false);
-  const Type &long_type = LongType::Instance(true);
-  const Type &double_type = DoubleType::Instance(true);
-  const Type &float_type = FloatType::Instance(true);
-  const Type &char_type = CharType::Instance(true, 10);
-  const Type &varchar_type = VarCharType::Instance(true, 10);
-
-  initializeHandle(IntType::Instance(true));
-  int int_val = 0;
-  std::int64_t long_val = 0;
-  double double_val = 0;
-  float float_val = 0;
-
-  iterateHandle(aggregation_handle_avg_state_.get(), int_non_null_type.makeValue(&int_val));
-
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), long_type.makeValue(&long_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), double_type.makeValue(&double_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), float_type.makeValue(&float_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), char_type.makeValue("asdf", 5)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), varchar_type.makeValue("asdf", 5)), "");
-
-  // Test mergeStates() with incorrectly typed handles.
-  std::unique_ptr<AggregationHandle> aggregation_handle_avg_double(
-      AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
-          std::vector<const Type*>(1, &double_type)));
-  std::unique_ptr<AggregationState> aggregation_state_avg_merge_double(
-      aggregation_handle_avg_double->createInitialState());
-  static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_double).iterateUnaryInl(
-      static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_double.get()),
-      double_type.makeValue(&double_val));
-  EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double,
-                                                    aggregation_handle_avg_state_.get()),
-               "");
-
-  std::unique_ptr<AggregationHandle> aggregation_handle_avg_float(
-      AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
-          std::vector<const Type*>(1, &float_type)));
-  std::unique_ptr<AggregationState> aggregation_state_avg_merge_float(
-      aggregation_handle_avg_float->createInitialState());
-  static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_float).iterateUnaryInl(
-      static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_float.get()),
-      float_type.makeValue(&float_val));
-  EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float,
-                                                    aggregation_handle_avg_state_.get()),
-               "");
-}
-#endif
-
-TEST_F(AggregationHandleAvgTest, canApplyToTypeTest) {
-  EXPECT_TRUE(ApplyToTypesTest(kInt));
-  EXPECT_TRUE(ApplyToTypesTest(kLong));
-  EXPECT_TRUE(ApplyToTypesTest(kFloat));
-  EXPECT_TRUE(ApplyToTypesTest(kDouble));
-  EXPECT_FALSE(ApplyToTypesTest(kChar));
-  EXPECT_FALSE(ApplyToTypesTest(kVarChar));
-  EXPECT_FALSE(ApplyToTypesTest(kDatetime));
-  EXPECT_TRUE(ApplyToTypesTest(kDatetimeInterval));
-  EXPECT_TRUE(ApplyToTypesTest(kYearMonthInterval));
-}
-
-TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) {
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kInt, kDouble));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kDouble));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
-}
-
-TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
-  const Type &long_non_null_type = LongType::Instance(false);
-  initializeHandle(long_non_null_type);
-  storage_manager_.reset(new StorageManager("./test_avg_data"));
-  std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      aggregation_handle_avg_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
-          std::vector<const Type *>(1, &long_non_null_type),
-          10,
-          storage_manager_.get()));
-  std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      aggregation_handle_avg_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
-          std::vector<const Type *>(1, &long_non_null_type),
-          10,
-          storage_manager_.get()));
-
-  AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
-          destination_hash_table.get());
-
-  AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
-          source_hash_table.get());
-
-  AggregationHandleAvg *aggregation_handle_avg_derived =
-      static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
-  // We create three keys: first is present in both the hash tables, second key
-  // is present only in the source hash table while the third key is present
-  // the destination hash table only.
-  std::vector<TypedValue> common_key;
-  common_key.emplace_back(static_cast<std::int64_t>(0));
-  std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
-  exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
-  exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
-
-  const std::int64_t common_key_source_avg = 355;
-  TypedValue common_key_source_avg_val(common_key_source_avg);
-
-  const std::int64_t common_key_destination_avg = 295;
-  TypedValue common_key_destination_avg_val(common_key_destination_avg);
-
-  const std::int64_t exclusive_key_source_avg = 1;
-  TypedValue exclusive_key_source_avg_val(exclusive_key_source_avg);
-
-  const std::int64_t exclusive_key_destination_avg = 1;
-  TypedValue exclusive_key_destination_avg_val(exclusive_key_destination_avg);
-
-  std::unique_ptr<AggregationStateAvg> common_key_source_state(
-      static_cast<AggregationStateAvg *>(
-          aggregation_handle_avg_->createInitialState()));
-  std::unique_ptr<AggregationStateAvg> common_key_destination_state(
-      static_cast<AggregationStateAvg *>(
-          aggregation_handle_avg_->createInitialState()));
-  std::unique_ptr<AggregationStateAvg> exclusive_key_source_state(
-      static_cast<AggregationStateAvg *>(
-          aggregation_handle_avg_->createInitialState()));
-  std::unique_ptr<AggregationStateAvg> exclusive_key_destination_state(
-      static_cast<AggregationStateAvg *>(
-          aggregation_handle_avg_->createInitialState()));
-
-  // Create avg value states for keys.
-  aggregation_handle_avg_derived->iterateUnaryInl(common_key_source_state.get(),
-                                                  common_key_source_avg_val);
-
-  aggregation_handle_avg_derived->iterateUnaryInl(
-      common_key_destination_state.get(), common_key_destination_avg_val);
-
-  aggregation_handle_avg_derived->iterateUnaryInl(
-      exclusive_key_destination_state.get(), exclusive_key_destination_avg_val);
-
-  aggregation_handle_avg_derived->iterateUnaryInl(
-      exclusive_key_source_state.get(), exclusive_key_source_avg_val);
-
-  // Add the key-state pairs to the hash tables.
-  source_hash_table_derived->putCompositeKey(common_key,
-                                             *common_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      common_key, *common_key_destination_state);
-  source_hash_table_derived->putCompositeKey(exclusive_source_key,
-                                             *exclusive_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      exclusive_destination_key, *exclusive_key_destination_state);
-
-  EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
-  EXPECT_EQ(2u, source_hash_table_derived->numEntries());
-
-  aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table,
-                                                  destination_hash_table.get());
-
-  EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
-
-  CheckAvgValue<double>(
-      (common_key_destination_avg_val.getLiteral<std::int64_t>() +
-          common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2),
-      *aggregation_handle_avg_derived,
-      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
-  CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
-                  *aggregation_handle_avg_derived,
-                  *(destination_hash_table_derived->getSingleCompositeKey(
-                      exclusive_destination_key)));
-  CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
-                  *aggregation_handle_avg_derived,
-                  *(source_hash_table_derived->getSingleCompositeKey(
-                      exclusive_source_key)));
-}
-
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index de8cfeb..ea522b8 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -280,15 +280,16 @@ void WindowAggregationOperationState::windowAggregateBlocks(
   }
 
   // Do actual calculation in handle.
-  window_aggregation_handle_->calculate(all_blocks_accessor,
-                                        std::move(argument_vecs),
-                                        partition_by_ids_,
-                                        is_row_,
-                                        num_preceding_,
-                                        num_following_);
-
-  ValueAccessor* output_accessor = window_aggregation_handle_->finalize();
-  output_destination->bulkInsertTuples(output_accessor);
+  ColumnVector *window_aggregates =
+      window_aggregation_handle_->calculate(all_blocks_accessor,
+                                            std::move(argument_vecs),
+                                            partition_by_ids_,
+                                            is_row_,
+                                            num_preceding_,
+                                            num_following_);
+
+  all_blocks_accessor->addColumn(window_aggregates);
+  output_destination->bulkInsertTuples(all_blocks_accessor);
 }
 
 }  // namespace quickstep