You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:38 UTC

[14/30] incubator-quickstep git commit: - Supported ROWS mode for AVG window aggregation. - Created WindowAggregateFunctions in expressions/window_aggregation. - Created WindowAggregationHandle for AVG to actually do the calculation. - Other functions wi

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
new file mode 100644
index 0000000..c044a98
--- /dev/null
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -0,0 +1,387 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *   limitations under the License.
+ **/
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "types/CharType.hpp"
+#include "types/DateOperatorOverloads.hpp"
+#include "types/DatetimeIntervalType.hpp"
+#include "types/DoubleType.hpp"
+#include "types/FloatType.hpp"
+#include "types/IntType.hpp"
+#include "types/IntervalLit.hpp"
+#include "types/LongType.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/VarCharType.hpp"
+#include "types/YearMonthIntervalType.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+#include "gtest/gtest.h"
+
+namespace quickstep {
+
+namespace {
+
+  constexpr int kNumTuples = 100;
+  constexpr int kNumTuplesPerPartition = 8;
+  constexpr int kNullInterval = 25;
+  constexpr int kNumPreceding = 2;
+  constexpr int kNumFollowing = 2;
+
+}  // namespace
+
+// Attribute value could be null if set true.
+class WindowAggregationHandleAvgTest : public::testing::Test {
+ protected:
+  // Handle initialization.
+  void initializeHandle(const Type &argument_type) {
+    const WindowAggregateFunction &function =
+        WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg);
+    std::vector<const Type*> partition_key_types(1, &TypeFactory::GetType(kInt, false));
+    handle_avg_.reset(function.createHandle(std::vector<const Type*>(1, &argument_type),
+                                            std::move(partition_key_types)));
+  }
+
+  // Test canApplyToTypes().
+  static bool CanApplyToTypesTest(TypeID typeID) {
+    const Type &type = (typeID == kChar || typeID == kVarChar) ?
+        TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
+        TypeFactory::GetType(typeID);
+
+    return WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).canApplyToTypes(
+        std::vector<const Type*>(1, &type));
+  }
+
+  // Test resultTypeForArgumentTypes().
+  static bool ResultTypeForArgumentTypesTest(TypeID input_type_id,
+                                             TypeID output_type_id) {
+    const Type *result_type
+        = WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).resultTypeForArgumentTypes(
+            std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+    return (result_type->getTypeID() == output_type_id);
+  }
+
+  template <typename CppType>
+  static void CheckAvgValues(
+      const std::vector<CppType*> &expected,
+      const ColumnVector *actual) {
+    EXPECT_TRUE(actual->isNative());
+    const NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
+
+    EXPECT_EQ(expected.size(), native->size());
+    for (std::size_t i = 0; i < expected.size(); ++i) {
+      if (expected[i] == nullptr) {
+        EXPECT_TRUE(native->getTypedValue(i).isNull());
+      } else {
+        EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral<CppType>());
+      }
+    }
+  }
+
+  // Static templated methods for set a meaningful value to data types.
+  template <typename CppType>
+  static void SetDataType(int value, CppType *data) {
+    *data = value;
+  }
+
+  template <typename GenericType, typename OutputType = DoubleType>
+  void checkWindowAggregationAvgGeneric() {
+    const GenericType &type = GenericType::Instance(true);
+    initializeHandle(type);
+
+    // Create argument, partition key and cpptype vectors.
+    std::vector<typename GenericType::cpptype*> argument_cpp_vector;
+    argument_cpp_vector.reserve(kNumTuples);
+    ColumnVector *argument_type_vector =
+        createArgumentGeneric<GenericType>(&argument_cpp_vector);
+    NativeColumnVector *partition_key_vector =
+        new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+
+    for (int i = 0; i < kNumTuples; ++i) {
+      partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+    }
+
+    // Create tuple ValueAccessor.
+    ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
+    tuple_accessor->addColumn(partition_key_vector);
+    tuple_accessor->addColumn(argument_type_vector);
+
+    // Test UNBOUNDED PRECEDING AND CURRENT ROW.
+    checkAccumulate<GenericType, OutputType>(tuple_accessor,
+                                             argument_type_vector,
+                                             argument_cpp_vector);
+    // Test kNumPreceding PRECEDING AND kNumFollowing FOLLOWING.
+    checkSlidingWindow<GenericType, OutputType>(tuple_accessor,
+                                                argument_type_vector,
+                                                argument_cpp_vector);
+  }
+
+  template <typename GenericType>
+  ColumnVector *createArgumentGeneric(
+      std::vector<typename GenericType::cpptype*> *argument_cpp_vector) {
+    const GenericType &type = GenericType::Instance(true);
+    NativeColumnVector *column = new NativeColumnVector(type, kNumTuples);
+
+    for (int i = 0; i < kNumTuples; ++i) {
+      // Insert a NULL every kNullInterval tuples.
+      if (i % kNullInterval == 0) {
+        argument_cpp_vector->push_back(nullptr);
+        column->appendTypedValue(type.makeNullValue());
+        continue;
+      }
+
+      typename GenericType::cpptype *val = new typename GenericType::cpptype;
+
+      if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+        SetDataType(i - 10, val);
+      } else {
+        SetDataType(static_cast<float>(i - 10) / 10, val);
+      }
+
+      column->appendTypedValue(type.makeValue(val));
+      argument_cpp_vector->push_back(val);
+    }
+
+    return column;
+  }
+
+  template <typename GenericType, typename OutputType>
+  void checkAccumulate(ColumnVectorsValueAccessor *tuple_accessor,
+                       ColumnVector *argument_type_vector,
+                       const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
+    std::vector<ColumnVector*> arguments;
+    arguments.push_back(argument_type_vector);
+    // The partition key index is 0.
+    std::vector<attribute_id> partition_key(1, 0);
+
+    ColumnVector *result =
+        handle_avg_->calculate(tuple_accessor,
+                               std::move(arguments),
+                               partition_key,
+                               true  /* is_row */,
+                               -1  /* num_preceding: UNBOUNDED PRECEDING */,
+                               0  /* num_following: CURRENT ROW */);
+
+    // Get the cpptype result.
+    std::vector<typename OutputType::cpptype*> result_cpp_vector;
+    typename GenericType::cpptype sum;
+    int count;
+    for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+      // Start of new partition
+      if (i % kNumTuplesPerPartition == 0) {
+        SetDataType(0, &sum);
+        count = 0;
+      }
+
+      typename GenericType::cpptype *value = argument_cpp_vector[i];
+      if (value != nullptr) {
+        sum += *value;
+        count++;
+      }
+
+      if (count == 0) {
+        result_cpp_vector.push_back(nullptr);
+      } else {
+        typename OutputType::cpptype *result_cpp_value =
+            new typename OutputType::cpptype;
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+        result_cpp_vector.push_back(result_cpp_value);
+      }
+    }
+
+    CheckAvgValues(result_cpp_vector, result);
+  }
+
+  template <typename GenericType, typename OutputType>
+  void checkSlidingWindow(ColumnVectorsValueAccessor *tuple_accessor,
+                          ColumnVector *argument_type_vector,
+                          const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
+    std::vector<ColumnVector*> arguments;
+    arguments.push_back(argument_type_vector);
+    // The partition key index is 0.
+    std::vector<attribute_id> partition_key(1, 0);
+
+    ColumnVector *result =
+        handle_avg_->calculate(tuple_accessor,
+                               std::move(arguments),
+                               partition_key,
+                               true  /* is_row */,
+                               kNumPreceding,
+                               kNumFollowing);
+
+    // Get the cpptype result.
+    // For each value, calculate all surrounding values in the window.
+    std::vector<typename OutputType::cpptype*> result_cpp_vector;
+
+    for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+      typename GenericType::cpptype sum;
+      SetDataType(0, &sum);
+      int count = 0;
+
+      if (argument_cpp_vector[i] != nullptr) {
+        sum += *argument_cpp_vector[i];
+        count++;
+      }
+
+      for (std::size_t precede = 1; precede <= kNumPreceding; ++precede) {
+        // Not the same partition.
+        if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition ||
+            i < precede) {
+          break;
+        }
+
+        if (argument_cpp_vector[i - precede] != nullptr) {
+          sum += *argument_cpp_vector[i - precede];
+          count++;
+        }
+      }
+
+      for (int follow = 1; follow <= kNumPreceding; ++follow) {
+        // Not the same partition.
+        if (i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition ||
+            i + follow >= kNumTuples) {
+          break;
+        }
+
+        if (argument_cpp_vector[i + follow] != nullptr) {
+          sum += *argument_cpp_vector[i + follow];
+          count++;
+        }
+      }
+
+      if (count == 0) {
+        result_cpp_vector.push_back(nullptr);
+      } else {
+        typename OutputType::cpptype *result_cpp_value =
+            new typename OutputType::cpptype;
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+        result_cpp_vector.push_back(result_cpp_value);
+      }
+    }
+
+    CheckAvgValues(result_cpp_vector, result);
+  }
+
+  std::unique_ptr<WindowAggregationHandle> handle_avg_;
+};
+
+template <>
+void WindowAggregationHandleAvgTest::CheckAvgValues<double>(
+    const std::vector<double*> &expected,
+    const ColumnVector *actual) {
+  EXPECT_TRUE(actual->isNative());
+  const NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
+
+  EXPECT_EQ(expected.size(), native->size());
+  for (std::size_t i = 0; i < expected.size(); ++i) {
+    if (expected[i] == nullptr) {
+      EXPECT_TRUE(native->getTypedValue(i).isNull());
+    } else {
+      EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral<double>());
+    }
+  }
+}
+
+template <>
+void WindowAggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(
+    int value, DatetimeIntervalLit *data) {
+  data->interval_ticks = value;
+}
+
+template <>
+void WindowAggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(
+    int value, YearMonthIntervalLit *data) {
+  data->months = value;
+}
+
+typedef WindowAggregationHandleAvgTest WindowAggregationHandleAvgDeathTest;
+
+TEST_F(WindowAggregationHandleAvgTest, IntTypeTest) {
+  checkWindowAggregationAvgGeneric<IntType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, LongTypeTest) {
+  checkWindowAggregationAvgGeneric<LongType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, FloatTypeTest) {
+  checkWindowAggregationAvgGeneric<FloatType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, DoubleTypeTest) {
+  checkWindowAggregationAvgGeneric<DoubleType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, DatetimeIntervalTypeTest) {
+  checkWindowAggregationAvgGeneric<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, YearMonthIntervalTypeTest) {
+  checkWindowAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+#ifdef QUICKSTEP_DEBUG
+TEST_F(WindowAggregationHandleAvgDeathTest, CharTypeTest) {
+  const Type &type = CharType::Instance(true, 10);
+  EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(WindowAggregationHandleAvgDeathTest, VarTypeTest) {
+  const Type &type = VarCharType::Instance(true, 10);
+  EXPECT_DEATH(initializeHandle(type), "");
+}
+#endif
+
+TEST_F(WindowAggregationHandleAvgDeathTest, canApplyToTypeTest) {
+  EXPECT_TRUE(CanApplyToTypesTest(kInt));
+  EXPECT_TRUE(CanApplyToTypesTest(kLong));
+  EXPECT_TRUE(CanApplyToTypesTest(kFloat));
+  EXPECT_TRUE(CanApplyToTypesTest(kDouble));
+  EXPECT_FALSE(CanApplyToTypesTest(kChar));
+  EXPECT_FALSE(CanApplyToTypesTest(kVarChar));
+  EXPECT_FALSE(CanApplyToTypesTest(kDatetime));
+  EXPECT_TRUE(CanApplyToTypesTest(kDatetimeInterval));
+  EXPECT_TRUE(CanApplyToTypesTest(kYearMonthInterval));
+}
+
+TEST_F(WindowAggregationHandleAvgDeathTest, ResultTypeForArgumentTypeTest) {
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kInt, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kLong, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kFloat, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDouble, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDatetimeInterval, kDatetimeInterval));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kYearMonthInterval, kYearMonthInterval));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7e53b9d..a56b714 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -69,6 +69,8 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryoptimizer_ExecutionHeuristics

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 43d63f9..ce21ade 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -48,6 +48,8 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
 #include "query_optimizer/ExecutionHeuristics.hpp"
@@ -1652,7 +1654,7 @@ void ExecutionGenerator::convertWindowAggregate(
   // Get input.
   const CatalogRelationInfo *input_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->input());
-  window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+  window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
 
   // Get window aggregate function expression.
   const E::AliasPtr &named_window_aggregate_expression =
@@ -1713,6 +1715,7 @@ void ExecutionGenerator::convertWindowAggregate(
   const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
       execution_plan_->addRelationalOperator(
           new WindowAggregationOperator(query_handle_->query_id(),
+                                        *input_relation_info->relation,
                                         *output_relation,
                                         window_aggr_state_index,
                                         insert_destination_index));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 08d7df5..d12644a 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -304,7 +304,7 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       glog
-                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_Expression

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp
index 7b1f304..be5db59 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.cpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.cpp
@@ -22,7 +22,7 @@
 #include <utility>
 #include <vector>
 
-#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/expressions/PatternMatcher.hpp"
@@ -59,7 +59,7 @@ const Type& WindowAggregateFunction::getValueType() const {
 }
 
 WindowAggregateFunctionPtr WindowAggregateFunction::Create(
-    const ::quickstep::AggregateFunction &window_aggregate,
+    const ::quickstep::WindowAggregateFunction &window_aggregate,
     const std::vector<ScalarPtr> &arguments,
     const WindowInfo &window_info,
     const std::string &window_name,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp
index 0bee28f..0cc80df 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.hpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.hpp
@@ -33,8 +33,8 @@
 
 namespace quickstep {
 
-class AggregateFunction;
 class Type;
+class WindowAggregateFunction;
 
 namespace optimizer {
 namespace expressions {
@@ -140,7 +140,7 @@ class WindowAggregateFunction : public Expression {
    * @return The WindowAggregateFunction singleton (from the expression system)
    *         for this node.
    **/
-  inline const ::quickstep::AggregateFunction& window_aggregate() const {
+  inline const ::quickstep::WindowAggregateFunction& window_aggregate() const {
     return window_aggregate_;
   }
 
@@ -185,7 +185,7 @@ class WindowAggregateFunction : public Expression {
    * @param is_distinct Whether this is a DISTINCT aggregation.
    * @return A new AggregateFunctionPtr.
    **/
-  static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate,
+  static WindowAggregateFunctionPtr Create(const ::quickstep::WindowAggregateFunction &window_aggregate,
                                            const std::vector<ScalarPtr> &arguments,
                                            const WindowInfo &window_info,
                                            const std::string &window_name,
@@ -209,7 +209,7 @@ class WindowAggregateFunction : public Expression {
    * @param window_info The window info of the window aggregate function.
    * @param is_distinct Indicates whether this is a DISTINCT aggregation.
    */
-  WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate,
+  WindowAggregateFunction(const ::quickstep::WindowAggregateFunction &window_aggregate,
                           const std::vector<ScalarPtr> &arguments,
                           const WindowInfo &window_info,
                           const std::string &window_name,
@@ -228,7 +228,7 @@ class WindowAggregateFunction : public Expression {
   // window_aggregate_. If it really needs to be seperated from the
   // AggregationFunction, a new class for WindowAggregationFunction should be
   // created as quickstep::WindowAggregateFunction.
-  const ::quickstep::AggregateFunction &window_aggregate_;
+  const ::quickstep::WindowAggregateFunction &window_aggregate_;
   std::vector<ScalarPtr> arguments_;
   const WindowInfo window_info_;
   const std::string window_name_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt
index fb75767..9313e51 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -39,6 +39,8 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
                       quickstep_expressions_tablegenerator_GeneratorFunction
                       quickstep_expressions_tablegenerator_GeneratorFunctionFactory
                       quickstep_expressions_tablegenerator_GeneratorFunctionHandle
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
                       quickstep_parser_ParseAssignment
                       quickstep_parser_ParseBasicExpressions
                       quickstep_parser_ParseBlockProperties

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index f10378b..c224388 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -35,6 +35,8 @@
 #include "expressions/table_generator/GeneratorFunction.hpp"
 #include "expressions/table_generator/GeneratorFunctionFactory.hpp"
 #include "expressions/table_generator/GeneratorFunctionHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
 #include "parser/ParseAssignment.hpp"
 #include "parser/ParseBasicExpressions.hpp"
 #include "parser/ParseBlockProperties.hpp"
@@ -2624,11 +2626,19 @@ E::ScalarPtr Resolver::resolveFunctionCall(
         << "COUNT aggregate has both star (*) and non-star arguments.";
   }
 
-  // Try to look up the AggregateFunction by name using
-  // AggregateFunctionFactory.
-  const ::quickstep::AggregateFunction *aggregate
-      = AggregateFunctionFactory::GetByName(function_name);
-  if (aggregate == nullptr) {
+  // Try to look up the AggregateFunction/WindowAggregationFunction by name.
+  // TODO(Shixuan): We might want to create a new abstract class Function to
+  // include both AggregateFunction and WindowAggregateFunction, which will make
+  // this part of code cleaner.
+  const ::quickstep::AggregateFunction *aggregate = nullptr;
+  const ::quickstep::WindowAggregateFunction *window_aggregate = nullptr;
+  if (parse_function_call.isWindow()) {
+    window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
+  } else {
+    aggregate = AggregateFunctionFactory::GetByName(function_name);
+  }
+
+  if (aggregate == nullptr && window_aggregate == nullptr) {
     THROW_SQL_ERROR_AT(&parse_function_call)
         << "Unrecognized function name \""
         << parse_function_call.name()->value()
@@ -2656,11 +2666,14 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   }
 
   // Make sure a naked COUNT() with no arguments wasn't specified.
-  if ((aggregate->getAggregationID() == AggregationID::kCount)
-      && (resolved_arguments.empty())
-      && (!count_star)) {
-    THROW_SQL_ERROR_AT(&parse_function_call)
-        << "COUNT aggregate requires an argument (either scalar or star (*))";
+  if ((aggregate != nullptr &&
+       aggregate->getAggregationID() == AggregationID::kCount) ||
+      (window_aggregate != nullptr &&
+       window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
+    if ((resolved_arguments.empty()) && !count_star) {
+      THROW_SQL_ERROR_AT(&parse_function_call)
+          << "COUNT aggregate requires an argument (either scalar or star (*))";
+    }
   }
 
   // Resolve each of the Scalar arguments to the aggregate.
@@ -2670,7 +2683,8 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   }
 
   // Make sure that the aggregate can apply to the specified argument(s).
-  if (!aggregate->canApplyToTypes(argument_types)) {
+  if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
+      || (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
     THROW_SQL_ERROR_AT(&parse_function_call)
         << "Aggregate function " << aggregate->getName()
         << " can not apply to the given argument(s).";
@@ -2679,7 +2693,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   if (parse_function_call.isWindow()) {
     return resolveWindowAggregateFunction(parse_function_call,
                                           expression_resolution_info,
-                                          aggregate,
+                                          window_aggregate,
                                           resolved_arguments);
   }
 
@@ -2705,7 +2719,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
 E::ScalarPtr Resolver::resolveWindowAggregateFunction(
     const ParseFunctionCall &parse_function_call,
     ExpressionResolutionInfo *expression_resolution_info,
-    const ::quickstep::AggregateFunction *window_aggregate,
+    const ::quickstep::WindowAggregateFunction *window_aggregate,
     const std::vector<E::ScalarPtr> &resolved_arguments) {
   // A window aggregate function might be defined OVER a window name or a window.
   E::WindowAggregateFunctionPtr window_aggregate_function;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp
index f4024e9..373430c 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -23,7 +23,6 @@
 #include <unordered_set>
 #include <vector>
 
-#include "query_optimizer/expressions/AggregateFunction.hpp"
 #include "query_optimizer/expressions/Alias.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
@@ -460,14 +459,14 @@ class Resolver {
    * @param expression_resolution_info Resolution info that contains the name
    *                                   resolver and info to be updated after
    *                                   resolution.
-   * @param aggregate The aggregate function.
+   * @param aggregate The window aggregate function.
    * @param resolved_arguments The resolved arguments.
    * @return An expression in the query optimizer.
    */
   expressions::ScalarPtr resolveWindowAggregateFunction(
       const ParseFunctionCall &parse_function_call,
       ExpressionResolutionInfo *expression_resolution_info,
-      const ::quickstep::AggregateFunction *aggregate,
+      const ::quickstep::WindowAggregateFunction *aggregate,
       const std::vector<expressions::ScalarPtr> &resolved_arguments);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 16127cc..30a3c39 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -953,19 +953,79 @@ WHERE double_col < 0
 ==
 
 # Window Aggregation Test.
-# Currently this is not supported, an empty table will be returned.
-SELECT avg(int_col) OVER w FROM test
+SELECT char_col, long_col, avg(long_col) OVER w FROM test
 WINDOW w AS
-(PARTITION BY char_col
- ORDER BY long_col DESC NULLS LAST
+(ORDER BY char_col DESC NULLS LAST
  ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
 --
-+------------------------+
-|avg(int_col)            |
-+------------------------+
-+------------------------+
++--------------------+--------------------+------------------------+
+|char_col            |long_col            |avg(long_col)           |
++--------------------+--------------------+------------------------+
+|          8 2.828427|                  64|                      64|
+|          6 2.449490|                  36|                      50|
+|          4 2.000000|                  16|      38.666666666666664|
+|         24 4.898979|                 576|                     173|
+|         22 4.690416|                 484|      235.19999999999999|
+|         20 4.472136|                 400|      262.66666666666669|
+|          2 1.414214|                   4|      225.71428571428572|
+|         18 4.242641|                 324|                     238|
+|         16 4.000000|                 256|                     240|
+|         14 3.741657|                 196|      235.59999999999999|
+|         12 3.464102|                 144|      227.27272727272728|
+|         10 3.162278|                 100|      216.66666666666666|
+|          0 0.000000|                   0|                     200|
+|         -9 3.000000|                  81|                   191.5|
+|         -7 2.645751|                  49|                     182|
+|         -5 2.236068|                  25|                172.1875|
+|         -3 1.732051|                   9|      162.58823529411765|
+|        -23 4.795832|                 529|      182.94444444444446|
+|        -21 4.582576|                 441|      196.52631578947367|
+|        -19 4.358899|                 361|                  204.75|
+|        -17 4.123106|                 289|      208.76190476190476|
+|        -15 3.872983|                 225|                   209.5|
+|        -13 3.605551|                 169|       207.7391304347826|
+|        -11 3.316625|                 121|                 204.125|
+|         -1 1.000000|                   1|                     196|
++--------------------+--------------------+------------------------+
 ==
 
+SELECT long_col, int_col, avg(int_col) OVER w FROM test
+WINDOW w AS
+(ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING);
+--
++--------------------+-----------+------------------------+
+|long_col            |int_col    |avg(int_col)            |
++--------------------+-----------+------------------------+
+|                 576|         24|       7.666666666666667|
+|                 529|        -23|                     0.5|
+|                 484|         22|                     0.5|
+|                 441|        -21|                  -10.25|
+|                 400|       NULL|                       0|
+|                 361|        -19|                   -9.75|
+|                 324|         18|                    -0.5|
+|                 289|        -17|     -3.3999999999999999|
+|                 256|         16|      3.2000000000000002|
+|                 225|        -15|                      -3|
+|                 196|         14|      2.7999999999999998|
+|                 169|        -13|     -2.6000000000000001|
+|                 144|         12|                     0.5|
+|                 121|        -11|                   -5.25|
+|                 100|       NULL|                       0|
+|                  81|         -9|                   -4.75|
+|                  64|          8|                    -0.5|
+|                  49|         -7|     -1.3999999999999999|
+|                  36|          6|                     1.2|
+|                  25|         -5|                      -1|
+|                  16|          4|     0.80000000000000004|
+|                   9|         -3|    -0.59999999999999998|
+|                   4|          2|                     0.5|
+|                   1|         -1|    -0.66666666666666663|
+|                   0|       NULL|                     0.5|
++--------------------+-----------+------------------------+
+==
+
+# Currently this is not supported, an empty table will be returned.
 SELECT int_col, sum(float_col) OVER
 (PARTITION BY char_col, long_col
  ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
@@ -987,5 +1047,5 @@ WINDOW w AS
 +------------------------+
 |sum(avg(int_col))       |
 +------------------------+
-|                    NULL|
+|                     -18|
 +------------------------+

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 249441d..a51370b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -434,6 +434,7 @@ target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_StorageBlockInfo
+                      quickstep_storage_WindowAggregationOperationState
                       quickstep_utility_Macros
                       tmb)                      
 target_link_libraries(quickstep_relationaloperators_WorkOrder

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 93cb9d4..3149864 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -21,11 +21,13 @@
 
 #include <vector>
 
+#include "catalog/CatalogRelation.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.hpp"
 
 #include "tmb/id_typedefs.h"
 
@@ -40,10 +42,14 @@ bool WindowAggregationOperator::getAllWorkOrders(
   DCHECK(query_context != nullptr);
 
   if (blocking_dependencies_met_ && !generated_) {
+    std::vector<block_id> relation_blocks =
+        input_relation_.getBlocksSnapshot();
+
     container->addNormalWorkOrder(
         new WindowAggregationWorkOrder(
             query_id_,
             query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+            std::move(relation_blocks),
             query_context->getInsertDestination(output_destination_index_)),
         op_index_);
     generated_ = true;
@@ -67,6 +73,11 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
   proto->set_query_id(query_id_);
   proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
                       window_aggregation_state_index_);
+
+  const std::vector<block_id> relation_blocks = input_relation_.getBlocksSnapshot();
+  for (const block_id bid : relation_blocks) {
+    proto->AddExtension(serialization::WindowAggregationWorkOrder::block_ids, bid);
+  }
   proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
                       output_destination_index_);
 
@@ -75,8 +86,8 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
 
 
 void WindowAggregationWorkOrder::execute() {
-  std::cout << "Window aggregation is not supported yet.\n"
-      << "An empty table is returned\n";
+  state_->windowAggregateBlocks(output_destination_,
+                                block_ids_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
index f3dfd14..bd83248 100644
--- a/relational_operators/WindowAggregationOperator.hpp
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -58,16 +58,19 @@ class WindowAggregationOperator : public RelationalOperator {
    *
    * @param query_id The ID of this query.
    * @param input_relation The relation to perform aggregation over.
+   * @param output_relation The relation for output.
    * @param window_aggregation_state_index The index of WindowAggregationState
    *                                       in QueryContext.
    * @param output_destination_index The index of InsertDestination in
    *                                 QueryContext for the output.
    **/
   WindowAggregationOperator(const std::size_t query_id,
+                            const CatalogRelation &input_relation,
                             const CatalogRelation &output_relation,
                             const QueryContext::window_aggregation_state_id window_aggregation_state_index,
                             const QueryContext::insert_destination_id output_destination_index)
       : RelationalOperator(query_id),
+        input_relation_(input_relation),
         output_relation_(output_relation),
         window_aggregation_state_index_(window_aggregation_state_index),
         output_destination_index_(output_destination_index),
@@ -99,6 +102,7 @@ class WindowAggregationOperator : public RelationalOperator {
    **/
   serialization::WorkOrder* createWorkOrderProto();
 
+  const CatalogRelation &input_relation_;
   const CatalogRelation &output_relation_;
   const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
   const QueryContext::insert_destination_id output_destination_index_;
@@ -117,43 +121,25 @@ class WindowAggregationWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of this query.
    * @param state The WindowAggregationOperatorState to use.
+   * @param block_ids The blocks' id of the input relation.
    * @param output_destination The InsertDestination for output.
    **/
   WindowAggregationWorkOrder(const std::size_t query_id,
                              WindowAggregationOperationState *state,
+                             std::vector<block_id> &&block_ids,
                              InsertDestination *output_destination)
       : WorkOrder(query_id),
         state_(state),
+        block_ids_(std::move(block_ids)),
         output_destination_(output_destination)  {}
 
   ~WindowAggregationWorkOrder() override {}
 
-  /**
-   * @brief Get the pointer to WindowAggregationOperationState.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return A pointer to the window aggregation operation state.
-   **/
-  WindowAggregationOperationState* state() {
-    return state_;
-  }
-
-  /**
-   * @brief Get the pointer to output destination.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return A pointer to the output destination.
-   **/
-  InsertDestination* output_destination() {
-    return output_destination_;
-  }
-
   void execute() override;
 
  private:
   WindowAggregationOperationState *state_;
+  const std::vector<block_id> block_ids_;
   InsertDestination *output_destination_;
 
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationWorkOrder);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 69dee1b..076735f 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -249,6 +249,7 @@ message WindowAggregationWorkOrder {
   extend WorkOrder {
     // All required
     optional uint32 window_aggr_state_index = 336;
-    optional int32 insert_destination_index = 337;
+    repeated fixed64 block_ids = 337;
+    optional int32 insert_destination_index = 338;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 9df66e1..582effd 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -1053,19 +1053,26 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_ExpressionFactories
                       quickstep_expressions_Expressions_proto
-                      quickstep_expressions_aggregation_AggregateFunction
-                      quickstep_expressions_aggregation_AggregateFunctionFactory
-                      quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_storage_InsertDestination
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
+                      quickstep_storage_SubBlocksReference
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
                       quickstep_storage_WindowAggregationOperationState_proto
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorUtil
+                      quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
-                      quickstep_expressions_aggregation_AggregateFunction_proto
                       quickstep_expressions_Expressions_proto
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
                       ${PROTOBUF_LIBRARY})
 
 # Module all-in-one library:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index a0bcc37..0cdfc1a 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -31,14 +31,21 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/ExpressionFactories.hpp"
 #include "expressions/Expressions.pb.h"
-#include "expressions/aggregation/AggregateFunction.hpp"
-#include "expressions/aggregation/AggregateFunctionFactory.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/InsertDestination.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
 #include "storage/WindowAggregationOperationState.pb.h"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/containers/ColumnVectorUtil.hpp"
 
 #include "glog/logging.h"
 
@@ -46,23 +53,21 @@ namespace quickstep {
 
 WindowAggregationOperationState::WindowAggregationOperationState(
     const CatalogRelationSchema &input_relation,
-    const AggregateFunction *window_aggregate_function,
+    const WindowAggregateFunction *window_aggregate_function,
     std::vector<std::unique_ptr<const Scalar>> &&arguments,
-    std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
     const bool is_row,
     const std::int64_t num_preceding,
     const std::int64_t num_following,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
       arguments_(std::move(arguments)),
-      partition_by_attributes_(std::move(partition_by_attributes)),
       is_row_(is_row),
       num_preceding_(num_preceding),
       num_following_(num_following),
       storage_manager_(storage_manager) {
   // Get the Types of this window aggregate's arguments so that we can create an
   // AggregationHandle.
-  // TODO(Shixuan): Next step: New handles for window aggregation function.
   std::vector<const Type*> argument_types;
   for (const std::unique_ptr<const Scalar> &argument : arguments_) {
     argument_types.emplace_back(&argument->getType());
@@ -71,28 +76,18 @@ WindowAggregationOperationState::WindowAggregationOperationState(
   // Check if window aggregate function could apply to the arguments.
   DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
 
+  // IDs and types of partition keys.
+  std::vector<const Type*> partition_by_types;
+  for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
+    partition_by_ids_.push_back(
+        partition_by_attribute->getAttributeIdForValueAccessor());
+    partition_by_types.push_back(&partition_by_attribute->getType());
+  }
+
   // Create the handle and initial state.
   window_aggregation_handle_.reset(
-      window_aggregate_function->createHandle(argument_types));
-  window_aggregation_state_.reset(
-      window_aggregation_handle_->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  // See if all of this window aggregate's arguments are attributes in the input
-  // relation. If so, remember the attribute IDs so that we can do copy elision
-  // when actually performing the window aggregation.
-  arguments_as_attributes_.reserve(arguments_.size());
-  for (const std::unique_ptr<const Scalar> &argument : arguments_) {
-    const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
-    if (argument_id == -1) {
-      arguments_as_attributes_.clear();
-      break;
-    } else {
-      DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
-      arguments_as_attributes_.push_back(argument_id);
-    }
-  }
-#endif
+      window_aggregate_function->createHandle(std::move(argument_types),
+                                              std::move(partition_by_types)));
 }
 
 WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
@@ -101,10 +96,6 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
     StorageManager *storage_manager) {
   DCHECK(ProtoIsValid(proto, database));
 
-  // Rebuild contructor arguments from their representation in 'proto'.
-  const AggregateFunction *aggregate_function
-      = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
-
   std::vector<std::unique_ptr<const Scalar>> arguments;
   arguments.reserve(proto.arguments_size());
   for (int argument_idx = 0; argument_idx < proto.arguments_size(); ++argument_idx) {
@@ -126,10 +117,10 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
   const std::int64_t num_preceding = proto.num_preceding();
   const std::int64_t num_following = proto.num_following();
 
-  return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
-                                             aggregate_function,
+  return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+                                             &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()),
                                              std::move(arguments),
-                                             std::move(partition_by_attributes),
+                                             partition_by_attributes,
                                              is_row,
                                              num_preceding,
                                              num_following,
@@ -139,11 +130,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
 bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
                                                    const CatalogDatabaseLite &database) {
   if (!proto.IsInitialized() ||
-      !database.hasRelationWithId(proto.relation_id())) {
+      !database.hasRelationWithId(proto.input_relation_id())) {
     return false;
   }
 
-  if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+  if (!WindowAggregateFunctionFactory::ProtoIsValid(proto.function())) {
     return false;
   }
 
@@ -176,4 +167,122 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
   return true;
 }
 
+void WindowAggregationOperationState::windowAggregateBlocks(
+    InsertDestination *output_destination,
+    const std::vector<block_id> &block_ids) {
+  // TODO(Shixuan): This is a quick fix for currently unsupported functions in
+  // order to pass the query_optimizer test.
+  if (window_aggregation_handle_.get() == nullptr) {
+    std::cout << "The function will be supported in the near future :)\n";
+    return;
+  }
+
+  // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar
+  // work. This will need Order Key to be passed so that we know where the
+  // window should start and end.
+  if (!is_row_) {
+    std::cout << "Currently we don't support RANGE frame mode :(\n";
+    return;
+  }
+
+  // Get the total number of tuples.
+  int num_tuples = 0;
+  for (const block_id block_idx : block_ids) {
+    num_tuples +=
+        storage_manager_->getBlock(block_idx, input_relation_)->getNumTuples();
+  }
+
+  // Construct column vectors for attributes.
+  std::vector<ColumnVector*> attribute_vecs;
+  for (std::size_t attr_id = 0; attr_id < input_relation_.size(); ++attr_id) {
+    const CatalogAttribute *attr = input_relation_.getAttributeById(attr_id);
+    const Type &type = attr->getType();
+
+    if (NativeColumnVector::UsableForType(type)) {
+      attribute_vecs.push_back(new NativeColumnVector(type, num_tuples));
+    } else {
+      attribute_vecs.push_back(new IndirectColumnVector(type, num_tuples));
+    }
+  }
+
+  // Construct column vectors for arguments.
+  std::vector<ColumnVector*> argument_vecs;
+  for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+    const Type &type = argument->getType();
+
+    if (NativeColumnVector::UsableForType(type)) {
+      argument_vecs.push_back(new NativeColumnVector(type, num_tuples));
+    } else {
+      argument_vecs.push_back(new IndirectColumnVector(type, num_tuples));
+    }
+  }
+
+  // TODO(Shixuan): Add Support for Vector Copy Elision Selection.
+  // Add tuples and arguments into ColumnVectors.
+  for (const block_id block_idx : block_ids) {
+    BlockReference block = storage_manager_->getBlock(block_idx, input_relation_);
+    const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+    SubBlocksReference sub_block_ref(tuple_block,
+                                     block->getIndices(),
+                                     block->getIndicesConsistent());
+    ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
+    ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
+    for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+      argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
+                                                          &sub_block_ref));
+    }
+
+    InvokeOnAnyValueAccessor(tuple_accessor,
+                             [&] (auto *tuple_accessor) -> void {  // NOLINT(build/c++11)
+      tuple_accessor->beginIteration();
+      argument_accessor->beginIteration();
+
+      while (tuple_accessor->next() && argument_accessor->next()) {
+        for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) {
+          ColumnVector *attr_vec = attribute_vecs[attr_id];
+          if (attr_vec->isNative()) {
+            static_cast<NativeColumnVector*>(attr_vec)->appendTypedValue(
+                tuple_accessor->getTypedValue(attr_id));
+          } else {
+            static_cast<IndirectColumnVector*>(attr_vec)->appendTypedValue(
+                tuple_accessor->getTypedValue(attr_id));
+          }
+        }
+
+        for (std::size_t argument_idx = 0;
+             argument_idx < argument_vecs.size();
+             ++argument_idx) {
+          ColumnVector *argument = argument_vecs[argument_idx];
+          if (argument->isNative()) {
+            static_cast<NativeColumnVector*>(argument)->appendTypedValue(
+                argument_accessor->getTypedValue(argument_idx));
+          } else {
+            static_cast<IndirectColumnVector*>(argument)->appendTypedValue(
+                argument_accessor->getTypedValue(argument_idx));
+          }
+        }
+      }
+    });
+  }
+
+  // Construct the value accessor for tuples in all blocks
+  ColumnVectorsValueAccessor *all_blocks_accessor
+      = new ColumnVectorsValueAccessor();
+  for (ColumnVector *attr_vec : attribute_vecs) {
+    all_blocks_accessor->addColumn(attr_vec);
+  }
+
+  // Do actual calculation in handle.
+  ColumnVector *window_aggregates =
+      window_aggregation_handle_->calculate(all_blocks_accessor,
+                                            std::move(argument_vecs),
+                                            partition_by_ids_,
+                                            is_row_,
+                                            num_preceding_,
+                                            num_following_);
+
+  all_blocks_accessor->addColumn(window_aggregates);
+  output_destination->bulkInsertTuples(all_blocks_accessor);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index d7b3e6a..9792a99 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -25,20 +25,20 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/WindowAggregationOperationState.pb.h"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
-class AggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
 class InsertDestination;
 class StorageManager;
+class WindowAggregateFunction;
 
 /** \addtogroup Storage
  *  @{
@@ -63,13 +63,12 @@ class WindowAggregationOperationState {
    *                      current row. -1 means UNBOUNDED PRECEDING.
    * @param num_following The number of rows/range for the tuples following the
    *                      current row. -1 means UNBOUNDED FOLLOWING.
-   * @param storage_manager The StorageManager to use for allocating hash
-   *        tables.
+   * @param storage_manager The StorageManager to get block references.
    */
   WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
-                                  const AggregateFunction *window_aggregate_function,
+                                  const WindowAggregateFunction *window_aggregate_function,
                                   std::vector<std::unique_ptr<const Scalar>> &&arguments,
-                                  std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+                                  const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
                                   const bool is_row,
                                   const std::int64_t num_preceding,
                                   const std::int64_t num_following,
@@ -107,66 +106,29 @@ class WindowAggregationOperationState {
                            const CatalogDatabaseLite &database);
 
   /**
-   * @brief Get the is_row info.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   * 
-   * @return True if the frame mode is ROW, false if it is RANGE.
-   **/
-  const bool is_row() const { return is_row_; }
-
-  /**
-   * @brief Get the num_preceding info.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return The number of rows/range that precedes the current row.
-   **/
-  const std::int64_t num_preceding() const { return num_preceding_; }
-
-  /**
-   * @brief Get the num_following info.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
+   * @brief Compute window aggregates on the tuples of the given relation.
    *
-   * @return The number of rows/range that follows the current row.
+   * @param output_destination The output destination for the computed window
+   *                           aggregate.
+   * @param block_ids The id of the blocks to be computed.
    **/
-  const std::int64_t num_following() const { return num_following_; }
-
-  /**
-   * @brief Get the pointer to StorageManager.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return A pointer to the storage manager.
-   **/
-  StorageManager *storage_manager() { return storage_manager_; }
+  void windowAggregateBlocks(InsertDestination *output_destination,
+                             const std::vector<block_id> &block_ids);
 
  private:
   const CatalogRelationSchema &input_relation_;
-
-  // TODO(Shixuan): Handle and State for window aggregation will be needed for
-  //                actual calculation.
-  std::unique_ptr<AggregationHandle> window_aggregation_handle_;
-  std::unique_ptr<AggregationState> window_aggregation_state_;
+  const std::vector<block_id> block_ids_;
+  std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
   std::vector<std::unique_ptr<const Scalar>> arguments_;
+  std::vector<attribute_id> partition_by_ids_;
 
-  // We don't add order_by_attributes here since it is not needed after sorting.
-  std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
-
-  // Window framing information.
+  // Frame info.
   const bool is_row_;
   const std::int64_t num_preceding_;
   const std::int64_t num_following_;
 
   StorageManager *storage_manager_;
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  // If all an aggregate's argument expressions are simply attributes in
-  // 'input_relation_', then this caches the attribute IDs of those arguments.
-  std::vector<attribute_id> arguments_as_attributes_;
-#endif
-
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index c7bd0ef..d888461 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -19,12 +19,12 @@ syntax = "proto2";
 
 package quickstep.serialization;
 
-import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/window_aggregation/WindowAggregateFunction.proto";
 import "expressions/Expressions.proto";
 
 message WindowAggregationOperationState {
-  required int32 relation_id = 1;
-  required AggregateFunction function = 2;
+  required int32 input_relation_id = 1;
+  required WindowAggregateFunction function = 2;
   repeated Scalar arguments = 3;
   repeated Scalar partition_by_attributes = 4;
   required bool is_row = 5;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/tests/WindowAggregationOperationState_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/WindowAggregationOperationState_unittest.cpp b/storage/tests/WindowAggregationOperationState_unittest.cpp
index c572034..d58f0f5 100644
--- a/storage/tests/WindowAggregationOperationState_unittest.cpp
+++ b/storage/tests/WindowAggregationOperationState_unittest.cpp
@@ -23,7 +23,7 @@
 #include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "storage/WindowAggregationOperationState.hpp"
 #include "storage/WindowAggregationOperationState.pb.h"
 
@@ -57,8 +57,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, UninitializationTest) {
 
 TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
   serialization::WindowAggregationOperationState proto;
-  proto.set_relation_id(kInvalidTableId);
-  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_input_relation_id(kInvalidTableId);
+  proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
   proto.set_is_row(true);
   proto.set_num_preceding(kValidNum);
   proto.set_num_following(kValidNum);
@@ -67,8 +67,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
 
 TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
   serialization::WindowAggregationOperationState proto;
-  proto.set_relation_id(rel_id_);
-  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_input_relation_id(rel_id_);
+  proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
   proto.set_is_row(true);
   proto.set_num_preceding(kInvalidNum);
   proto.set_num_following(kValidNum);
@@ -81,8 +81,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
 
 TEST_F(WindowAggregationOperationStateProtoTest, ValidTest) {
   serialization::WindowAggregationOperationState proto;
-  proto.set_relation_id(rel_id_);
-  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_input_relation_id(rel_id_);
+  proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
   proto.set_is_row(true);
   proto.set_num_preceding(kValidNum);
   proto.set_num_following(kValidNum);