You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/11 14:12:22 UTC

[3/3] incubator-quickstep git commit: Seperated calculation into two parts to check intermediate result

Seperated calculation into two parts to check intermediate result


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d5f535ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d5f535ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d5f535ee

Branch: refs/heads/SQL-window-aggregation
Commit: d5f535ee9f0e3c7a8ea7615a07699b83c5f905f4
Parents: 2966405
Author: shixuan-fan <sh...@apache.org>
Authored: Mon Jul 11 14:11:47 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Mon Jul 11 14:11:47 2016 +0000

----------------------------------------------------------------------
 .../WindowAggregateFunction.hpp                 |   4 +
 .../WindowAggregateFunctionAvg.cpp              |   6 +-
 .../WindowAggregateFunctionAvg.hpp              |   2 +
 .../WindowAggregationHandle.hpp                 |  16 +-
 .../WindowAggregationHandleAvg.cpp              |  78 ++++++----
 .../WindowAggregationHandleAvg.hpp              |  11 +-
 .../WindowAggregationHandleAvg_unittest.cpp     | 148 ++++++++++++++++++-
 query_optimizer/ExecutionGenerator.cpp          |   5 +
 query_optimizer/resolver/Resolver.cpp           |   8 +-
 .../WindowAggregationOperator.cpp               |   3 +-
 storage/WindowAggregationOperationState.cpp     |  25 +++-
 storage/WindowAggregationOperationState.hpp     |   5 +-
 storage/WindowAggregationOperationState.proto   |   1 +
 13 files changed, 253 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index 9cc5d74..84d97fc 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -26,10 +26,12 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
+class CatalogRelationSchema;
 class WindowAggregationHandle;
 class Type;
 
@@ -128,6 +130,8 @@ class WindowAggregateFunction {
    *         is responsible for deleting the returned object.
    **/
   virtual WindowAggregationHandle* createHandle(
+      const CatalogRelationSchema &relation,
+      const std::vector<block_id> block_ids,
       std::vector<const Type*> &&argument_types,
       std::vector<const Type*> &&partition_key_types) const = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index e9a4453..06ff1d9 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -72,13 +72,17 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
 }
 
 WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
+    const CatalogRelationSchema &relation,
+    const std::vector<block_id> block_ids,
     std::vector<const Type*> &&argument_types,
     std::vector<const Type*> &&partition_key_types) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
       << " that AVG can not be applied to.";
 
-  return new WindowAggregationHandleAvg(*argument_types.front(),
+  return new WindowAggregationHandleAvg(relation,
+                                        block_ids,
+                                        *argument_types.front(),
                                         std::move(partition_key_types));
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 18e1022..91acf7e 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -57,6 +57,8 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
       const std::vector<const Type*> &argument_types) const override;
 
   WindowAggregationHandle* createHandle(
+      const CatalogRelationSchema &relation,
+      const std::vector<block_id> block_ids,
       std::vector<const Type*> &&argument_types,
       std::vector<const Type*> &&partition_key_types) const override;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 77b1e76..6b7988a 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -99,14 +99,13 @@ class WindowAggregationHandle {
    * @param output_destination The destination for output.
    **/
   virtual void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
-                         const std::vector<block_id> &block_ids,
                          const std::vector<attribute_id> &partition_by_ids,
-                         const CatalogRelationSchema &relation,
                          const bool is_row,
                          const std::int64_t num_preceding,
                          const std::int64_t num_following,
-                         StorageManager *storage_manager,
-                         InsertDestinationInterface *output_destination) const = 0;
+                         StorageManager *storage_manager) = 0;
+
+  virtual std::vector<ValueAccessor*>&& finalize(StorageManager *storage_manager) = 0;
 
  protected:
   /**
@@ -119,7 +118,14 @@ class WindowAggregationHandle {
    * @param num_following The number of rows/range that follows the current row.
    * @param storage_manager A pointer to the storage manager.
    **/
-  WindowAggregationHandle() {}
+  WindowAggregationHandle(const CatalogRelationSchema &relation,
+                          const std::vector<block_id> block_ids)
+      : block_ids_(block_ids),
+        relation_(relation) {}
+
+  std::vector<ColumnVector*> window_aggregates_;
+  const std::vector<block_id> block_ids_;
+  const CatalogRelationSchema &relation_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index 62f5a88..e6e2894 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -25,6 +25,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
 #include "storage/InsertDestinationInterface.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageManager.hpp"
@@ -51,9 +52,12 @@ namespace quickstep {
 class StorageManager;
 
 WindowAggregationHandleAvg::WindowAggregationHandleAvg(
+    const CatalogRelationSchema &relation,
+    const std::vector<block_id> &block_ids,
     const Type &type,
     std::vector<const Type*> &&partition_key_types)
-    : argument_type_(type) {
+    : WindowAggregationHandle(relation, block_ids),
+      argument_type_(type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_id;
@@ -98,24 +102,20 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
 }
 
 void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
-                                           const std::vector<block_id> &block_ids,
                                            const std::vector<attribute_id> &partition_by_ids,
-                                           const CatalogRelationSchema &relation,
                                            const bool is_row,
                                            const std::int64_t num_preceding,
                                            const std::int64_t num_following,
-                                           StorageManager *storage_manager,
-                                           InsertDestinationInterface *output_destination) const {
+                                           StorageManager *storage_manager) {
   DCHECK(arguments.size() == 1);
-  DCHECK(!block_ids.empty());
   
   // Initialize the tuple accessors and argument accessors.
   // Index of each value accessor indicates the block it belongs to.
   std::vector<ValueAccessor*> tuple_accessors;
   std::vector<ColumnVectorsValueAccessor*> argument_accessors;
-  for (block_id bid : block_ids) {
+  for (block_id bid : block_ids_) {
     // Get tuple accessor.
-    BlockReference block = storage_manager->getBlock(bid, relation);
+    BlockReference block = storage_manager->getBlock(bid, relation_);
     const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
     ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
     tuple_accessors.push_back(tuple_accessor);
@@ -132,12 +132,14 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
 
   // Create a window for each tuple and calculate the window aggregate.
   for (std::uint32_t current_block_index = 0;
-       current_block_index < block_ids.size();
+       current_block_index < block_ids_.size();
        ++current_block_index) {
     ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
     ColumnVectorsValueAccessor* argument_accessor =
         argument_accessors[current_block_index];
-    
+    NativeColumnVector window_aggregates_for_block(*result_type_,
+                                                   argument_accessor->getNumTuples());
+
     InvokeOnAnyValueAccessor (
         tuple_accessor,
         [&] (auto *tuple_accessor) -> void {
@@ -145,24 +147,48 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
       argument_accessor->beginIteration();
       
       while (tuple_accessor->next() && argument_accessor->next()) {
-        TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
-                                                               argument_accessors,
-                                                               partition_by_ids,
-                                                               current_block_index,
-                                                               is_row,
-                                                               num_preceding,
-                                                               num_following);
-        Tuple *current_tuple = tuple_accessor->getTuple();
-        std::vector<TypedValue> new_tuple;
-        for (TypedValue value : *current_tuple) {
-          new_tuple.push_back(value);
-        }
-
-        new_tuple.push_back(window_aggregate);
-        output_destination->insertTupleInBatch(Tuple(std::move(new_tuple)));
+        const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
+                                                                     argument_accessors,
+                                                                     partition_by_ids,
+                                                                     current_block_index,
+                                                                     is_row,
+                                                                     num_preceding,
+                                                                     num_following);
+        window_aggregates_for_block.appendTypedValue(window_aggregate);
       }
     });
+
+    window_aggregates_.push_back(&window_aggregates_for_block);
+  }
+}
+
+std::vector<ValueAccessor*>&& WindowAggregationHandleAvg::finalize(
+    StorageManager *storage_manager) {
+  std::vector<ValueAccessor*> accessors;
+  
+  // Create a ValueAccessor for each block, including the new window aggregate
+  // attribute.
+  for (std::size_t block_idx = 0; block_idx < block_ids_.size(); ++block_idx) {
+    // Get the block information.
+    BlockReference block = storage_manager->getBlock(block_idx, relation_);
+    const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+    ValueAccessor *block_accessor = tuple_block.createValueAccessor();
+    SubBlocksReference sub_block_ref(tuple_block,
+                                     block->getIndices(),
+                                     block->getIndicesConsistent());
+    ColumnVectorsValueAccessor accessor;
+
+    for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+         attr_it != relation_.end();
+         ++attr_it) {
+      ScalarAttribute scalar_attr(*attr_it);
+      accessor.addColumn(scalar_attr.getAllValues(block_accessor, &sub_block_ref));
+    }
+
+    accessors.push_back(&accessor);
   }
+
+  return std::move(accessors);
 }
 
 TypedValue WindowAggregationHandleAvg::calculateOneWindow(
@@ -287,7 +313,7 @@ bool WindowAggregationHandleAvg::samePartition(
     const std::vector<attribute_id> &partition_by_ids) const {
   return InvokeOnAnyValueAccessor (tuple_accessor,
                                    [&] (auto *tuple_accessor) -> bool {
-    for (std::uint32_t partition_by_index = 0;
+    for (std::size_t partition_by_index = 0;
          partition_by_index < partition_by_ids.size();
          ++partition_by_index) {
       if (!equal_comparators_[partition_by_index]->compareTypedValues(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 115152e..e0ec766 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -56,14 +56,13 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
   ~WindowAggregationHandleAvg() override {}
 
   void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
-                 const std::vector<block_id> &block_ids,
                  const std::vector<attribute_id> &partition_by_ids,
-                 const CatalogRelationSchema &relation,
                  const bool is_row,
                  const std::int64_t num_preceding,
                  const std::int64_t num_following,
-                 StorageManager *storage_manager,
-                 InsertDestinationInterface *output_destination) const;
+                 StorageManager *storage_manager);
+
+  std::vector<ValueAccessor*>&& finalize(StorageManager *storage_manager);
 
  private:
   friend class WindowAggregateFunctionAvg;
@@ -79,7 +78,9 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
    * @param storage_manager A pointer to the storage manager.
    * @param type Type of the avg value.
    **/
-  explicit WindowAggregationHandleAvg(const Type &type,
+  explicit WindowAggregationHandleAvg(const CatalogRelationSchema &relation,
+                                      const std::vector<block_id> &block_ids,
+                                      const Type &type,
                                       std::vector<const Type*> &&partition_key_types);
 
   TypedValue calculateOneWindow(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index 6a7d161..326afa7 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -51,12 +51,142 @@ namespace quickstep {
 
 namespace {
 
-  constexpr int kNumSamples = 100;
+  constexpr int kNumTuplesPerBlock = 100;
+  constexpr int kNumBlocks = 5;
+  constexpr int kNumTuplesPerPartition = 8;
 
 }  // namespace
 
-class WindowAggregationHandleAvgTest : public::testing::Test {
+// Attribute value could be null if set true.
+class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
  protected:
+  virtual void SetUp() {
+    // Initialize relation and storage manager.
+    relation_.reset(new CatalogRelation(NULL, "TestRelation", kRelationId));
+    storage_manager_.reset(new StorageManager("TestAvg"));
+
+    // Add All kinds of TypedValues.
+    CatalogAttribute *int_attr = new CatalogAttribute(relation_.get(),
+                                                      "int_attr",
+                                                      TypeFactory::GetType(kInt, GetParam()));
+
+    relation_->addAttribute(int_attr);
+
+    CatalogAttribute *float_attr = new CatalogAttribute(relation_.get(),
+                                                        "float_attr",
+                                                        TypeFactory::GetType(kFloat, GetParam()));
+    relation_->addAttribute(float_attr);
+
+    CatalogAttribute *long_attr = new CatalogAttribute(relation_.get(),
+                                                       "long_attr",
+                                                       TypeFactory::GetType(kLong, GetParam()));
+    relation_->addAttribute(long_attr);
+
+    CatalogAttribute *double_attr = new CatalogAttribute(relation_.get(),
+                                                         "double_attr",
+                                                         TypeFactory::GetType(kDouble, GetParam()));
+    relation_->addAttribute(double_attr);
+
+    CatalogAttribute *char_attr = new CatalogAttribute(relation_.get(),
+                                                       "char_attr",
+                                                       TypeFactory::GetType(kChar, 4, GetParam()));
+    relation_->addAttribute(char_attr);
+
+    CatalogAttribute *varchar_attr = new CatalogAttribute(relation_.get(),
+                                                          "varchar_attr",
+                                                          TypeFactory::GetType(kVarChar, 32, GetParam()));
+    relation_->addAttribute(varchar_attr);
+    
+    // Records the 'base_value' of a tuple used in createSampleTuple.
+    CatalogAttribute *partition_value = new CatalogAttribute(relation_.get(),
+                                                             "partition_value",
+                                                             TypeFactory::GetType(kInt, false));
+    relation_->addAttribute(partition_value);
+
+    StorageBlockLayout *layout = StorageBlockLayout::GenerateDefaultLayout(*relation_, true);
+
+    // Initialize blocks.
+    for (int i = 0; i < kNumBlocks; ++i) {
+      block_id bid = storage_manager_->createBlock(relation_, layout);
+      relation_->addBlock(bid);
+      insertTuples(bid);
+    }
+  }
+
+  // Insert kNumTuplesPerBlock tuples into the block.
+  void insertTuples(block_id bid) {
+    MutableBlockReference block = storage_manager_->getBlockMutable(bid, relation_);
+    for (int i = 0; i < kNumTuplesPerBlock; ++i) {
+      Tuple *tuple = createTuple(bid * kNumTuplesPerBlock + i);
+      block->insertTuple(*tuple);
+    }
+  }
+
+  Tuple* createTuple(int base_value) {
+    std::vector<TypedValue> attrs;
+
+    // int_attr.
+    if (GetParam() && base_value % 10 == 0) {
+      // Throw in a NULL integer for every ten values.
+      attrs.emplace_back(kInt);
+    } else {
+      attrs.emplace_back(base_value);
+    }
+
+    // float_attr.
+    if (GetParam() && base_value % 10 == 1) {
+      attrs.emplace_back(kFloat);
+    } else {
+      attrs.emplace_back(static_cast<float>(0.4 * base_value));
+    }
+
+    // long_attr.
+    if (GetParam() && base_value % 10 == 2) {
+      attrs.emplace_back(kLong);
+    } else {
+      attrs.emplace_back(static_cast<std::int64_t>(base_value));
+    }
+
+    // double_attr.
+    if (GetParam() && base_value % 10 == 3) {
+      attrs.emplace_back(kDouble);
+    } else {
+      attrs.emplace_back(static_cast<double>(0.25 * base_value));
+    }
+
+    // char_attr
+    if (GetParam() && base_value % 10 == 4) {
+      attrs.emplace_back(CharType::InstanceNullable(4).makeNullValue());
+    } else {
+      std::ostringstream char_buffer;
+      char_buffer << base_value;
+      std::string string_literal(char_buffer.str());
+      attrs.emplace_back(CharType::InstanceNonNullable(4).makeValue(
+          string_literal.c_str(),
+          string_literal.size() > 3 ? 4
+                                    : string_literal.size() + 1));
+      attrs.back().ensureNotReference();
+    }
+
+    // varchar_attr
+    if (GetParam() && base_value % 10 == 5) {
+      attrs.emplace_back(VarCharType::InstanceNullable(32).makeNullValue());
+    } else {
+      std::ostringstream char_buffer;
+      char_buffer << "Here are some numbers: " << base_value;
+      std::string string_literal(char_buffer.str());
+      attrs.emplace_back(VarCharType::InstanceNonNullable(32).makeValue(
+          string_literal.c_str(),
+          string_literal.size() + 1));
+      attrs.back().ensureNotReference();
+    }
+
+    // base_value
+    attrs.emplace_back(base_value / kNumTuplesPerPartition);
+    return new Tuple(std::move(attrs));
+}
+  }
+  
   // Handle initialization.
   void initializeHandle(const Type &argument_type,
                         const std::vector<const Type*> &partition_key_types) {
@@ -86,11 +216,13 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
   }
 
   template <typename CppType>
-  static void CheckAvgValue(
-      CppType expected,
-      const AggregationHandle &handle,
-      const AggregationState &state) {
-    EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
+  static void CheckAvgValues(
+      std::vector<CppType> expected,
+      const ColumnVector* actual) {
+    EXPECT_EQ(expected.size(), actual->size());
+    for (std::size_t i = 0; i < expected.size(); ++i) {
+      EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral<CppType>());
+    }
   }
 
   // Static templated method for set a meaningful value to data types.
@@ -237,6 +369,8 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
   std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
   std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
   std::unique_ptr<StorageManager> storage_manager_;
+  std::unique_ptr<CatalogRelation> relation_;
+  std::vector<block_id> block_ids_;
 };
 
 const int AggregationHandleAvgTest::kNumSamples;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index ce21ade..06d47d2 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1656,6 +1656,11 @@ void ExecutionGenerator::convertWindowAggregate(
       findRelationInfoOutputByPhysical(physical_plan->input());
   window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
 
+  // Get relation blocks.
+  for (block_id bid : input_relation_info->relation->getBlocksSnapshot()) {
+    window_aggr_state_proto->add_block_ids(bid);
+  }
+
   // Get window aggregate function expression.
   const E::AliasPtr &named_window_aggregate_expression =
       physical_plan->window_aggregate_expression();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index 11348fe..d28213c 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -2630,8 +2630,8 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   // TODO(Shixuan): We might want to create a new abstract class Function to
   // include both AggregateFunction and WindowAggregateFunction, which will make
   // this part of code cleaner.
-  const ::quickstep::AggregateFunction *aggregate;
-  const ::quickstep::WindowAggregateFunction *window_aggregate;
+  const ::quickstep::AggregateFunction *aggregate = nullptr;
+  const ::quickstep::WindowAggregateFunction *window_aggregate = nullptr;
   if (parse_function_call.isWindow()) {
     window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
   } else {
@@ -2670,7 +2670,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
       || (window_aggregate != nullptr && window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
     if ((resolved_arguments.empty()) && !count_star) {
       THROW_SQL_ERROR_AT(&parse_function_call)
-          << "COUNT function requires an argument (either scalar or star (*))";
+          << "COUNT aggregate requires an argument (either scalar or star (*))";
     }
   }
 
@@ -2684,7 +2684,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
       || (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
     THROW_SQL_ERROR_AT(&parse_function_call)
-        << "Function " << aggregate->getName()
+        << "Aggregate function " << aggregate->getName()
         << " can not apply to the given argument(s).";
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index ec2f27c..4c2e2b5 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -81,8 +81,7 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
 
 
 void WindowAggregationWorkOrder::execute() {
-  state_->windowAggregateBlocks(output_destination_,
-                                block_ids_);
+  state_->windowAggregateBlocks(output_destination_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 8d05b79..9029bd4 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -47,6 +47,7 @@ namespace quickstep {
 
 WindowAggregationOperationState::WindowAggregationOperationState(
     const CatalogRelationSchema &input_relation,
+    std::vector<block_id> &&block_ids,
     const WindowAggregateFunction *window_aggregate_function,
     std::vector<std::unique_ptr<const Scalar>> &&arguments,
     std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -55,6 +56,7 @@ WindowAggregationOperationState::WindowAggregationOperationState(
     const std::int64_t num_following,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
+      block_ids_(std::move(block_ids)),
       arguments_(std::move(arguments)),
       is_row_(is_row),
       num_preceding_(num_preceding),
@@ -82,7 +84,9 @@ WindowAggregationOperationState::WindowAggregationOperationState(
 
   // Create the handle and initial state.
   window_aggregation_handle_.reset(
-      window_aggregate_function->createHandle(std::move(argument_types),
+      window_aggregate_function->createHandle(input_relation_,
+                                              block_ids_,
+                                              std::move(argument_types),
                                               std::move(partition_by_types)));
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -109,6 +113,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
     StorageManager *storage_manager) {
   DCHECK(ProtoIsValid(proto, database));
 
+  std::vector<block_id> block_ids;
+  for (int block_idx = 0; block_idx < proto.block_ids_size(); ++block_idx) {
+    block_ids.push_back(proto.block_ids(block_idx));
+  }
+
   // Rebuild contructor arguments from their representation in 'proto'.
   const WindowAggregateFunction *window_aggregate_function
       = &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function());
@@ -135,6 +144,7 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
   const std::int64_t num_following = proto.num_following();
 
   return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+                                             std::move(block_ids),
                                              window_aggregate_function,
                                              std::move(arguments),
                                              std::move(partition_by_attributes),
@@ -185,17 +195,18 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
 }
 
 void WindowAggregationOperationState::windowAggregateBlocks(
-    InsertDestination *output_destination,
-    const std::vector<block_id> &block_ids) {
+    InsertDestination *output_destination) {
   window_aggregation_handle_->calculate(arguments_,
-                                        block_ids,
                                         partition_by_ids_,
-                                        input_relation_,
                                         is_row_,
                                         num_preceding_,
                                         num_following_,
-                                        storage_manager_,
-                                        output_destination);
+                                        storage_manager_);
+  std::vector<ValueAccessor*> output_accessors(
+      window_aggregation_handle_->finalize(storage_manager_));
+  for (ValueAccessor* output_accessor : output_accessors) {
+    output_destination->bulkInsertTuples(output_accessor);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index f38dbd8..8116410 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -67,6 +67,7 @@ class WindowAggregationOperationState {
    *        tables.
    */
   WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
+                                  std::vector<block_id> &&block_ids,
                                   const WindowAggregateFunction *window_aggregate_function,
                                   std::vector<std::unique_ptr<const Scalar>> &&arguments,
                                   std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -112,11 +113,11 @@ class WindowAggregationOperationState {
    * @param output_destination The output destination for the computed window
    *                           aggregate.
    **/
-  void windowAggregateBlocks(InsertDestination *output_destination,
-                             const std::vector<block_id> &block_ids);
+  void windowAggregateBlocks(InsertDestination *output_destination);
 
  private:
   const CatalogRelationSchema &input_relation_;
+  const std::vector<block_id> block_ids_;
 
   // TODO(Shixuan): Handle and State for window aggregation will be needed for
   //                actual calculation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index 4dc0a6a..c3b672c 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -24,6 +24,7 @@ import "expressions/Expressions.proto";
 
 message WindowAggregationOperationState {
   required int32 input_relation_id = 1;
+  repeated fixed64 block_ids = 2;
   required WindowAggregateFunction function = 3;
   repeated Scalar arguments = 4;
   repeated Scalar partition_by_attributes = 5;