You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/26 16:51:55 UTC

incubator-quickstep git commit: - Supported RANGE mode for window aggregation. - Optimized the AVG calculation time complexity from O(nk) to O(n), where n is the number of tuples and k is the window size.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/SQL-window-aggregation [created] ff513a6dc


- Supported RANGE mode for window aggregation.
- Optimized the AVG calculation time complexity from O(nk) to O(n), where n is the number of tuples and k is the window size.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ff513a6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ff513a6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ff513a6d

Branch: refs/heads/SQL-window-aggregation
Commit: ff513a6dceefcb491f48e48573281d371f8a2654
Parents: c0bb462
Author: shixuan <sh...@apache.org>
Authored: Tue Jul 26 11:49:07 2016 -0500
Committer: shixuan <sh...@apache.org>
Committed: Tue Jul 26 11:49:07 2016 -0500

----------------------------------------------------------------------
 expressions/window_aggregation/CMakeLists.txt   |  11 +-
 .../WindowAggregateFunction.hpp                 |   5 +-
 .../WindowAggregateFunctionAvg.cpp              |   4 +-
 .../WindowAggregateFunctionAvg.hpp              |   3 +-
 .../WindowAggregateFunctionCount.cpp            |   4 +-
 .../WindowAggregateFunctionCount.hpp            |   3 +-
 .../WindowAggregateFunctionMax.cpp              |   3 +-
 .../WindowAggregateFunctionMax.hpp              |   3 +-
 .../WindowAggregateFunctionMin.cpp              |   3 +-
 .../WindowAggregateFunctionMin.hpp              |   3 +-
 .../WindowAggregateFunctionSum.cpp              |   3 +-
 .../WindowAggregateFunctionSum.hpp              |   3 +-
 .../WindowAggregationHandle.cpp                 | 145 ++++++++++++++
 .../WindowAggregationHandle.hpp                 |  84 ++++++--
 .../WindowAggregationHandleAvg.cpp              | 198 +++++++------------
 .../WindowAggregationHandleAvg.hpp              |  19 +-
 .../WindowAggregationHandleAvg_unittest.cpp     | 181 ++++++++++++++---
 query_optimizer/ExecutionGenerator.cpp          |  11 +-
 query_optimizer/resolver/Resolver.cpp           |  19 +-
 .../tests/execution_generator/Select.test       |  41 +++-
 storage/WindowAggregationOperationState.cpp     |  60 ++++--
 storage/WindowAggregationOperationState.hpp     |   3 +
 storage/WindowAggregationOperationState.proto   |   1 +
 23 files changed, 581 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt
index 6a16fcc..9e06897 100644
--- a/expressions/window_aggregation/CMakeLists.txt
+++ b/expressions/window_aggregation/CMakeLists.txt
@@ -44,7 +44,7 @@ add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
             WindowAggregateFunctionSum.cpp
             WindowAggregateFunctionSum.hpp)
 add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle
-            ../../empty_src.cpp
+            WindowAggregationHandle.cpp
             WindowAggregationHandle.hpp)
 add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
             WindowAggregationHandleAvg.cpp
@@ -131,9 +131,15 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
                       quickstep_storage_StorageBlockInfo
+                      quickstep_types_Type
+                      quickstep_types_TypeFactory
+                      quickstep_types_TypeID
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_types_operations_binaryoperations_BinaryOperation
+                      quickstep_types_operations_binaryoperations_BinaryOperationFactory
+                      quickstep_types_operations_binaryoperations_BinaryOperationID
                       quickstep_types_operations_comparisons_Comparison
                       quickstep_types_operations_comparisons_ComparisonFactory
                       quickstep_types_operations_comparisons_ComparisonID
@@ -141,8 +147,6 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH
 target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
                       glog
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_scalar_Scalar
-                      quickstep_expressions_scalar_ScalarAttribute
                       quickstep_expressions_windowaggregation_WindowAggregationHandle
                       quickstep_storage_ValueAccessor
                       quickstep_types_Type
@@ -183,7 +187,6 @@ target_link_libraries(WindowAggregationHandle_tests
                       quickstep_expressions_windowaggregation_WindowAggregateFunction
                       quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
                       quickstep_expressions_windowaggregation_WindowAggregationHandle
-                      quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
                       quickstep_expressions_windowaggregation_WindowAggregationID
                       quickstep_storage_ValueAccessor
                       quickstep_types_CharType

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index e40479b..d56b4ab 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -122,6 +122,8 @@ class WindowAggregateFunction {
    *        arguments to this WindowAggregateFunction.
    * @param partition_key_types A list or zero or more Types for partition keys
    *                            to this WindowAggregateFunction.
+   * @param first_order_key_type A pointer to the first order key type, nullptr
+   *                             if not needed.
    * 
    * @return A new WindowAggregationHandle that can be used to compute this
    *         WindowAggregateFunction over the specified argument_types. Caller
@@ -129,7 +131,8 @@ class WindowAggregateFunction {
    **/
   virtual WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const = 0;
+      const std::vector<const Type*> &partition_key_types,
+      const Type *first_order_key_type) const = 0;
 
  protected:
   explicit WindowAggregateFunction(const WindowAggregationID win_agg_id)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index bc31a53..6c1b139 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -73,12 +73,14 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<const Type*> &partition_key_types,
+    const Type *first_order_key_type) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
       << " that AVG can not be applied to.";
 
   return new WindowAggregationHandleAvg(partition_key_types,
+                                        first_order_key_type,
                                         *argument_types.front());
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 32fd9d5..b5e91fd 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -58,7 +58,8 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<const Type*> &partition_key_types,
+      const Type *first_order_key_type) const override;
 
  private:
   WindowAggregateFunctionAvg()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
index 504e000..7a24813 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
@@ -47,12 +47,12 @@ const Type* WindowAggregateFunctionCount::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionCount::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<const Type*> &partition_key_types,
+    const Type *first_order_key_type) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create a WindowAggregationHandleCount for argument Types "
       << "that COUNT can not be applied to (> 1 argument).";
 
-  // TODO(Shixuan): Add handle for Count.
   return nullptr;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
index 1b40fdd..f34af40 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
@@ -58,7 +58,8 @@ class WindowAggregateFunctionCount : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<const Type*> &partition_key_types,
+      const Type *first_order_key_type) const override;
 
  private:
   WindowAggregateFunctionCount()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
index f3997c7..ab8a727 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
@@ -55,7 +55,8 @@ const Type* WindowAggregateFunctionMax::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionMax::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<const Type*> &partition_key_types,
+    const Type *first_order_key_type) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create a WindowAggregationHandleMax for argument Type(s) "
       << "that MAX can not be applied to.";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
index 00c788e..f865ec7 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
@@ -58,7 +58,8 @@ class WindowAggregateFunctionMax : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<const Type*> &partition_key_types,
+      const Type *first_order_key_type) const override;
 
  private:
   WindowAggregateFunctionMax()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
index a13e28e..579447f 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
@@ -55,7 +55,8 @@ const Type* WindowAggregateFunctionMin::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionMin::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<const Type*> &partition_key_types,
+    const Type *first_order_key_type) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create a WindowAggregationHandleMin for argument Type(s) "
       << "that MIN can not be applied to.";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
index aeba539..1b02da1 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
@@ -58,7 +58,8 @@ class WindowAggregateFunctionMin : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<const Type*> &partition_key_types,
+      const Type *first_order_key_type) const override;
 
  private:
   WindowAggregateFunctionMin()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
index 636c53a..d3e089e 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
@@ -71,7 +71,8 @@ const Type* WindowAggregateFunctionSum::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionSum::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<const Type*> &partition_key_types,
+    const Type *first_order_key_type) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create a WindowAggregationHandleSum for argument Type(s) "
       << "that SUM can not be applied to.";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
index 047113c..4963040 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
@@ -58,7 +58,8 @@ class WindowAggregateFunctionSum : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<const Type*> &partition_key_types,
+      const Type *first_order_key_type) const override;
 
  private:
   WindowAggregateFunctionSum()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregationHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.cpp b/expressions/window_aggregation/WindowAggregationHandle.cpp
new file mode 100644
index 0000000..578711f
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandle.cpp
@@ -0,0 +1,145 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationHandle::WindowAggregationHandle(
+    const std::vector<const Type*> &partition_key_types,
+    const Type *first_order_key_type) {
+  // Comparison operators for checking if two tuples belong to the same partition.
+  for (const Type *partition_key_type : partition_key_types) {
+    equal_comparators_.emplace_back(
+        ComparisonFactory::GetComparison(ComparisonID::kEqual)
+            .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
+  }
+
+  // If in RANGE mode, need comparators and operators to check window frame.
+  if (first_order_key_type != nullptr) {
+    const Type &long_type = TypeFactory::GetType(kLong, false);
+    range_compare_type_ =
+        TypeFactory::GetUnifyingType(*first_order_key_type, long_type);
+
+    range_add_operator_.reset(
+        BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+            .makeUncheckedBinaryOperatorForTypes(*first_order_key_type, long_type));
+    range_comparator_.reset(
+        ComparisonFactory::GetComparison(ComparisonID::kLessOrEqual)
+            .makeUncheckedComparatorForTypes(*range_compare_type_, *range_compare_type_));
+  }
+}
+
+bool WindowAggregationHandle::samePartition(
+    const ColumnVectorsValueAccessor *tuple_accessor,
+    const tuple_id test_tuple_id,
+    const std::vector<attribute_id> &partition_by_ids) const {
+  // If test tuple does not exist.
+  if (test_tuple_id < 0 ||
+      test_tuple_id >= tuple_accessor->getNumTuples()) {
+    return false;
+  }
+
+  // Check all partition by attributes.
+  for (std::size_t partition_by_index = 0;
+       partition_by_index < partition_by_ids.size();
+       ++partition_by_index) {
+    if (!equal_comparators_[partition_by_index]->compareTypedValues(
+            tuple_accessor->getTypedValue(partition_by_ids[partition_by_index]),
+            tuple_accessor->getTypedValueAtAbsolutePosition(
+                partition_by_ids[partition_by_index], test_tuple_id))) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool WindowAggregationHandle::inRange(
+    const ColumnVectorsValueAccessor *tuple_accessor,
+    const tuple_id test_tuple_id,
+    const attribute_id first_order_by_id,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) const {
+  // If test tuple does not exist.
+  if (test_tuple_id < 0 ||
+      test_tuple_id >= tuple_accessor->getNumTuples()) {
+    return false;
+  }
+
+  // If test tuple is the current tuple, then it is in the range.
+  if (test_tuple_id == tuple_accessor->getCurrentPosition()) {
+    return true;
+  }
+
+  // Get the test value.
+  const Type &long_type = TypeFactory::GetType(kLong, false);
+  TypedValue test_value =
+      range_add_operator_->applyToTypedValues(
+          tuple_accessor->getTypedValueAtAbsolutePosition(first_order_by_id, test_tuple_id),
+          long_type.makeZeroValue());
+
+  // NULL will be considered not in range.
+  if (test_value.isNull() ||
+      tuple_accessor->getTypedValue(first_order_by_id).isNull()) {
+    return false;
+  }
+
+  // Get the boundary value if it is not UNBOUNDED.
+  bool in_range = true;
+
+  if (num_preceding < 1) {  // num_preceding is negated.
+    TypedValue start_boundary_value =
+        range_add_operator_->applyToTypedValues(
+            tuple_accessor->getTypedValue(first_order_by_id),
+            long_type.makeValue(&num_preceding));
+    in_range &=
+        range_comparator_->compareTypedValues(start_boundary_value, test_value);
+  }
+
+  if (num_following > -1) {
+    TypedValue end_boundary_value =
+        range_add_operator_->applyToTypedValues(
+            tuple_accessor->getTypedValue(first_order_by_id),
+            long_type.makeValue(&num_following));
+    in_range &=
+        range_comparator_->compareTypedValues(test_value, end_boundary_value);
+  }
+
+  return in_range;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 65f95d9..5595e11 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -27,19 +27,22 @@
 #include "catalog/CatalogRelationSchema.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
 #include "types/operations/comparisons/ComparisonID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
-class InsertDestinationInterface;
-class Scalar;
-class StorageManager;
 class Type;
 class ValueAccessor;
 
@@ -55,27 +58,29 @@ class ValueAccessor;
  *
  * A WindowAggregationHandle is created by calling
  * WindowAggregateFunction::createHandle(). The WindowAggregationHandle object
- * provides methods that are used to actually compute the window aggregate,
- * storing intermediate results in WindowAggregationState objects.
+ * provides methods that are used to actually compute the window aggregate.
  *
  * The work flow for computing a window aggregate is:
- *     1. Create an initial state by createInitialState().
  *     2. One thread will handle all the computation, iterating from the first
  *        tuple to the last tuple. Note there will be two modes that could be
  *        used upon different situations:
  *        a. If the window aggregate is defined as accumulative, which are:
  *           i.  Functions applied to whole partition, such as rank(), ntile()
- *               and dense_rank().
+ *               and dense_rank(). (Not implemented yet).
  *           ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING
  *               AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED
  *               FOLLOWING".
  *           Then, for functions except median, we could store some global
- *           values in the state without keeping all the tuple values around.
+ *           values without keeping all the tuple values around. For simplicity,
+ *           in avg(), count() and sum(), we treat the accumulative one as
+ *           sliding window since the time complexity does not vary.
  *        b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND
- *           3 FOLLOWING", we have to store all the tuples in the state so that
+ *           3 FOLLOWING", we have to store all the tuples in the state (at
+ *           least two pointers to the start tuple and end tuple), so that
  *           we could know which values should be dropped as the window slides.
- *        For each computed value, generate a tuple store in the column vector.
- *     3. Insert the new column into the original relation and return.
+ *        For each computed value, generate a TypedValue store in a ColumnVector
+ *        for window aggregate values.
+ *     3. Return the result ColumnVector.
  *
  * TODO(Shixuan): Currently we don't support parallelization. The basic idea for
  * parallelization is to calculate the partial result inside each block. Each
@@ -97,6 +102,7 @@ class WindowAggregationHandle {
    * @param block_accessors A pointer to the value accessor of block attributes.
    * @param arguments The ColumnVectors of arguments
    * @param partition_by_ids The ids of partition keys.
+   * @param order_by_ids The ids of order keys.
    * @param is_row True if the frame mode is ROWS, false if it is RANGE.
    * @param num_preceding The number of rows/range that precedes the current row.
    * @param num_following The number of rows/range that follows the current row.
@@ -104,8 +110,9 @@ class WindowAggregationHandle {
    * @return A ColumnVector of the calculated window aggregates.
    **/
   virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
-                                  std::vector<ColumnVector*> &&arguments,
+                                  const std::vector<ColumnVector*> &arguments,
                                   const std::vector<attribute_id> &partition_by_ids,
+                                  const std::vector<attribute_id> &order_by_ids,
                                   const bool is_row,
                                   const std::int64_t num_preceding,
                                   const std::int64_t num_following) const = 0;
@@ -115,18 +122,53 @@ class WindowAggregationHandle {
    * @brief Constructor.
    *
    * @param partition_key_types The Types of the partition key.
+   * @param first_order_key_type The type of the first order key, nullptr if
+   *                             not needed.
    **/
-  explicit WindowAggregationHandle(
-      const std::vector<const Type*> &partition_key_types) {
-    // Comparison operators for checking if two tuples belong to the same partition.
-    for (const Type *partition_key_type : partition_key_types) {
-      equal_comparators_.emplace_back(
-          ComparisonFactory::GetComparison(ComparisonID::kEqual)
-              .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
-    }
-  }
+  WindowAggregationHandle(
+      const std::vector<const Type*> &partition_key_types,
+      const Type *first_order_key_type);
+
+  /**
+   * @brief Check if test tuple is in the same partition as the current
+   *        tuple in the accessor.
+   *
+   * @param tuple_accessor The ValueAccessor for tuples.
+   * @param test_tuple_id The id of the test tuple.
+   * @param partition_by_ids The id of partition keys.
+   *
+   * @return True if test tuple is in the same partition as the current tuple in
+   *         the accessor, false if not.
+   **/
+  bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
+                     const tuple_id test_tuple_id,
+                     const std::vector<attribute_id> &partition_by_ids) const;
+
+  /**
+   * @brief Check if test tuple is in the defined range.
+   *
+   * @param tuple_accessor The ValueAccessor for tuples.
+   * @param test_tuple_id The id of the test tuple.
+   * @param first_order_by_id The id of first order key.
+   * @param num_preceding The value difference betweem the window start boundary
+   *                      and current value. This value has to be negated.
+   * @param num_following The value difference between the window end boudnary
+   *                      and current value.
+   *
+   * @return True if test tuple is in the defined range, false if not.
+   **/
+  bool inRange(const ColumnVectorsValueAccessor *tuple_accessor,
+               const tuple_id test_tuple_id,
+               const attribute_id first_order_by_id,
+               const std::int64_t num_preceding,
+               const std::int64_t num_following) const;
 
   std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
+  std::unique_ptr<UncheckedBinaryOperator> range_add_operator_;
+  // A "less than or equal to" comparator will be defined.
+  std::unique_ptr<UncheckedComparator> range_comparator_;
+  const Type* range_compare_type_;
+  const Type *first_order_key_type_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index a6a10d4..f1e1ada 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -24,8 +24,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/scalar/Scalar.hpp"
-#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFactory.hpp"
@@ -43,8 +42,9 @@ namespace quickstep {
 
 WindowAggregationHandleAvg::WindowAggregationHandleAvg(
     const std::vector<const Type*> &partition_key_types,
+    const Type *first_order_by_key_type,
     const Type &type)
-    : WindowAggregationHandle(partition_key_types),
+    : WindowAggregationHandle(partition_key_types, first_order_by_key_type),
       argument_type_(type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
@@ -77,6 +77,12 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
   fast_add_operator_.reset(
       BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
           .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+
+  // Subtract operator for dropping argument values off the window.
+  fast_subtract_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kSubtract)
+          .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+
   // Divide operator for dividing sum by count to get final average.
   divide_operator_.reset(
       BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
@@ -85,8 +91,9 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
 
 ColumnVector* WindowAggregationHandleAvg::calculate(
     ColumnVectorsValueAccessor *tuple_accessor,
-    std::vector<ColumnVector*> &&arguments,
+    const std::vector<ColumnVector*> &arguments,
     const std::vector<attribute_id> &partition_by_ids,
+    const std::vector<attribute_id> &order_by_ids,
     const bool is_row,
     const std::int64_t num_preceding,
     const std::int64_t num_following) const {
@@ -98,144 +105,83 @@ ColumnVector* WindowAggregationHandleAvg::calculate(
   // Initialize the output column and argument accessor.
   NativeColumnVector *window_aggregates =
       new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples());
-  ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor();
+
+  ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
   argument_accessor->addColumn(arguments[0]);
 
+  // Initialize the information about the window.
+  TypedValue sum = sum_type_->makeZeroValue();
+  std::uint64_t count = 0;
+  tuple_id start_tuple_id = 0;  // The id of the first tuple in the window.
+  tuple_id end_tuple_id = 0;  // The id of the tuple that just passed the last
+                              // tuple in the window.
+
   // Create a window for each tuple and calculate the window aggregate.
   tuple_accessor->beginIteration();
   argument_accessor->beginIteration();
 
   while (tuple_accessor->next() && argument_accessor->next()) {
-    const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor,
-                                                                 argument_accessor,
-                                                                 partition_by_ids,
-                                                                 is_row,
-                                                                 num_preceding,
-                                                                 num_following);
-    window_aggregates->appendTypedValue(window_aggregate);
-  }
-
-  return window_aggregates;
-}
-
-TypedValue WindowAggregationHandleAvg::calculateOneWindow(
-    ColumnVectorsValueAccessor *tuple_accessor,
-    ColumnVectorsValueAccessor *argument_accessor,
-    const std::vector<attribute_id> &partition_by_ids,
-    const bool is_row,
-    const std::int64_t num_preceding,
-    const std::int64_t num_following) const {
-  // Initialize.
-  TypedValue sum = sum_type_->makeZeroValue();
-  TypedValue current_value = argument_accessor->getTypedValue(0);
-  std::uint64_t count = 0;
-
-  // Ignore the value if null.
-  if (!current_value.isNull()) {
-    sum = fast_add_operator_->applyToTypedValues(sum, current_value);
-    count++;
-  }
-
-  // Get the partition key for the current row.
-  std::vector<TypedValue> current_row_partition_key;
-  for (attribute_id partition_by_id : partition_by_ids) {
-    current_row_partition_key.push_back(
-        tuple_accessor->getTypedValue(partition_by_id));
-  }
-
-  // Get current position.
-  tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
-
-  // Find preceding tuples.
-  int count_preceding = 0;
-  tuple_id preceding_tuple_id = current_tuple_id;
-  while (num_preceding == -1 || count_preceding < num_preceding) {
-    preceding_tuple_id--;
-
-    // No more preceding tuples.
-    if (preceding_tuple_id < 0) {
-      break;
-    }
-
-    // Get the partition keys and compare. If not the same partition as the
-    // current row, stop searching preceding tuples.
-    if (!samePartition(tuple_accessor,
-                       current_row_partition_key,
-                       preceding_tuple_id,
-                       partition_by_ids)) {
-      break;
-    }
-
-    // Actually count the element and do the calculation.
-    count_preceding++;
-    TypedValue preceding_value =
-        argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id);
-
-    // Ignore the value if null.
-    if (!preceding_value.isNull()) {
-      sum = fast_add_operator_->applyToTypedValues(sum, preceding_value);
-      count++;
-    }
-  }
-
-  // Find following tuples.
-  int count_following = 0;
-  tuple_id following_tuple_id = current_tuple_id;
-  while (num_following == -1 || count_following < num_following) {
-    following_tuple_id++;
-
-    // No more following tuples.
-    if (following_tuple_id == tuple_accessor->getNumTuples()) {
-      break;
+    tuple_id current_tuple_id = tuple_accessor->getCurrentPosition();
+
+    // If current tuple is not in the same partition as the previous tuple,
+    // reset the window.
+    if (!samePartition(tuple_accessor, current_tuple_id - 1, partition_by_ids)) {
+      start_tuple_id = current_tuple_id;
+      end_tuple_id = current_tuple_id;
+      count = 0;
+      sum = sum_type_->makeZeroValue();
     }
 
-    // Get the partition keys and compare. If not the same partition as the
-    // current row, stop searching preceding tuples.
-    if (!samePartition(tuple_accessor,
-                       current_row_partition_key,
-                       following_tuple_id,
-                       partition_by_ids)) {
-      break;
+    // Drop tuples that will be out of the window from the beginning.
+    while (num_preceding != -1 &&
+           ((is_row && current_tuple_id - start_tuple_id > num_preceding) ||
+            (!is_row && !inRange(tuple_accessor,
+                                 start_tuple_id,
+                                 order_by_ids[0],
+                                 -num_preceding,
+                                 num_following)))) {
+      TypedValue start_value =
+          argument_accessor->getTypedValueAtAbsolutePosition(0, start_tuple_id);
+      // Ignore the value if NULL.
+      if (!start_value.isNull()) {
+        sum = fast_subtract_operator_->applyToTypedValues(sum, start_value);
+        count--;
+      }
+
+      start_tuple_id++;
     }
 
-    // Actually count the element and do the calculation.
-    count_following++;
-    TypedValue following_value =
-        argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id);
-
-    // Ignore the value if null.
-    if (!following_value.isNull()) {
-      sum = fast_add_operator_->applyToTypedValues(sum, following_value);
-      count++;
+    // Add tuples that will be included by the window at the end.
+    while (samePartition(tuple_accessor, end_tuple_id, partition_by_ids) &&
+           (num_following == -1 ||
+            (is_row && end_tuple_id - current_tuple_id <= num_following) ||
+            (!is_row && inRange(tuple_accessor,
+                                end_tuple_id,
+                                order_by_ids[0],
+                                -num_preceding,
+                                num_following)))) {
+      TypedValue end_value =
+          argument_accessor->getTypedValueAtAbsolutePosition(0, end_tuple_id);
+
+      // Ignore the value if NULL.
+      if (!end_value.isNull()) {
+        sum = fast_add_operator_->applyToTypedValues(sum, end_value);
+        count++;
+      }
+
+      end_tuple_id++;
     }
-  }
 
-  // If all values are NULLs, return NULL; Otherwise, return the quotient.
-  if (count == 0) {
-    return result_type_->makeNullValue();
-  } else {
-    return divide_operator_->applyToTypedValues(sum,
-                                                TypedValue(static_cast<double>(count)));
-  }
-}
-
-bool WindowAggregationHandleAvg::samePartition(
-    const ColumnVectorsValueAccessor *tuple_accessor,
-    const std::vector<TypedValue> &current_row_partition_key,
-    const tuple_id boundary_tuple_id,
-    const std::vector<attribute_id> &partition_by_ids) const {
-  for (std::size_t partition_by_index = 0;
-       partition_by_index < partition_by_ids.size();
-       ++partition_by_index) {
-    if (!equal_comparators_[partition_by_index]->compareTypedValues(
-            current_row_partition_key[partition_by_index],
-            tuple_accessor->getTypedValueAtAbsolutePosition(
-                partition_by_ids[partition_by_index], boundary_tuple_id))) {
-      return false;
+    // If all values are NULLs, return NULL; Otherwise, return the quotient.
+    if (count == 0) {
+      window_aggregates->appendTypedValue(result_type_->makeNullValue());
+    } else {
+      window_aggregates->appendTypedValue(
+          divide_operator_->applyToTypedValues(sum, TypedValue(static_cast<double>(count))));
     }
   }
 
-  return true;
+  return window_aggregates;
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 5b41779..980f25b 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -31,7 +31,6 @@
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/operations/binary_operations/BinaryOperation.hpp"
-#include "types/operations/comparisons/Comparison.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -54,8 +53,9 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
   ~WindowAggregationHandleAvg() override {}
 
   ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
-                          std::vector<ColumnVector*> &&arguments,
+                          const std::vector<ColumnVector*> &arguments,
                           const std::vector<attribute_id> &partition_by_ids,
+                          const std::vector<attribute_id> &order_by_ids,
                           const bool is_row,
                           const std::int64_t num_preceding,
                           const std::int64_t num_following) const override;
@@ -70,25 +70,14 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
    * @param type Type of the avg value.
    **/
   WindowAggregationHandleAvg(const std::vector<const Type*> &partition_key_types,
+                             const Type *first_order_key_type,
                              const Type &type);
 
-  TypedValue calculateOneWindow(
-      ColumnVectorsValueAccessor *tuple_accessor,
-      ColumnVectorsValueAccessor *argument_accessor,
-      const std::vector<attribute_id> &partition_by_ids,
-      const bool is_row,
-      const std::int64_t num_preceding,
-      const std::int64_t num_following) const;
-
-  bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
-                     const std::vector<TypedValue> &current_row_partition_key,
-                     const tuple_id boundary_tuple_id,
-                     const std::vector<attribute_id> &partition_by_ids) const;
-
   const Type &argument_type_;
   const Type *sum_type_;
   const Type *result_type_;
   std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+  std::unique_ptr<UncheckedBinaryOperator> fast_subtract_operator_;
   std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
 
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index c044a98..98c2a78 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -27,7 +27,6 @@
 #include "expressions/window_aggregation/WindowAggregateFunction.hpp"
 #include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
 #include "expressions/window_aggregation/WindowAggregationHandle.hpp"
-#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
 #include "expressions/window_aggregation/WindowAggregationID.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "types/CharType.hpp"
@@ -58,6 +57,9 @@ namespace {
   constexpr int kNullInterval = 25;
   constexpr int kNumPreceding = 2;
   constexpr int kNumFollowing = 2;
+  constexpr int kPartitionKeyIndex = 0;
+  constexpr int kOrderKeyIndex = 1;
+  constexpr int kNumTuplesPerOrderKey = 2;
 
 }  // namespace
 
@@ -70,7 +72,8 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
         WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg);
     std::vector<const Type*> partition_key_types(1, &TypeFactory::GetType(kInt, false));
     handle_avg_.reset(function.createHandle(std::vector<const Type*>(1, &argument_type),
-                                            std::move(partition_key_types)));
+                                            partition_key_types,
+                                            &(TypeFactory::GetType(kInt, false))));
   }
 
   // Test canApplyToTypes().
@@ -101,6 +104,7 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
 
     EXPECT_EQ(expected.size(), native->size());
     for (std::size_t i = 0; i < expected.size(); ++i) {
+      std::cout << i << '\n';
       if (expected[i] == nullptr) {
         EXPECT_TRUE(native->getTypedValue(i).isNull());
       } else {
@@ -126,15 +130,19 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
     ColumnVector *argument_type_vector =
         createArgumentGeneric<GenericType>(&argument_cpp_vector);
     NativeColumnVector *partition_key_vector =
-        new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+        new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples);
+    NativeColumnVector *order_key_vector =
+        new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples);
 
     for (int i = 0; i < kNumTuples; ++i) {
       partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+      order_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerOrderKey));
     }
 
     // Create tuple ValueAccessor.
     ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
     tuple_accessor->addColumn(partition_key_vector);
+    tuple_accessor->addColumn(order_key_vector);
     tuple_accessor->addColumn(argument_type_vector);
 
     // Test UNBOUNDED PRECEDING AND CURRENT ROW.
@@ -183,44 +191,99 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
     std::vector<ColumnVector*> arguments;
     arguments.push_back(argument_type_vector);
     // The partition key index is 0.
-    std::vector<attribute_id> partition_key(1, 0);
+    std::vector<attribute_id> partition_key(1, kPartitionKeyIndex);
+    std::vector<attribute_id> order_key(1, kOrderKeyIndex);
 
-    ColumnVector *result =
+    // Check ROWS mode.
+    ColumnVector *rows_result =
         handle_avg_->calculate(tuple_accessor,
-                               std::move(arguments),
+                               arguments,
                                partition_key,
+                               order_key,
                                true  /* is_row */,
                                -1  /* num_preceding: UNBOUNDED PRECEDING */,
                                0  /* num_following: CURRENT ROW */);
 
     // Get the cpptype result.
-    std::vector<typename OutputType::cpptype*> result_cpp_vector;
-    typename GenericType::cpptype sum;
-    int count;
+    std::vector<typename OutputType::cpptype*> rows_result_cpp_vector;
+    typename GenericType::cpptype rows_sum;
+    int rows_count;
     for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
       // Start of new partition
       if (i % kNumTuplesPerPartition == 0) {
-        SetDataType(0, &sum);
-        count = 0;
+        SetDataType(0, &rows_sum);
+        rows_count = 0;
       }
 
       typename GenericType::cpptype *value = argument_cpp_vector[i];
       if (value != nullptr) {
-        sum += *value;
-        count++;
+        rows_sum += *value;
+        rows_count++;
       }
 
-      if (count == 0) {
-        result_cpp_vector.push_back(nullptr);
+      if (rows_count == 0) {
+        rows_result_cpp_vector.push_back(nullptr);
       } else {
         typename OutputType::cpptype *result_cpp_value =
             new typename OutputType::cpptype;
-        *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
-        result_cpp_vector.push_back(result_cpp_value);
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(rows_sum) / rows_count;
+        rows_result_cpp_vector.push_back(result_cpp_value);
+      }
+    }
+
+    CheckAvgValues(rows_result_cpp_vector, rows_result);
+
+    // Check RANGE mode.
+    ColumnVector *range_result =
+        handle_avg_->calculate(tuple_accessor,
+                               arguments,
+                               partition_key,
+                               order_key,
+                               false  /* is_row */,
+                               -1  /* num_preceding: UNBOUNDED PRECEDING */,
+                               0  /* num_following: CURRENT ROW */);
+
+    // Get the cpptype result.
+    std::vector<typename OutputType::cpptype*> range_result_cpp_vector;
+    typename GenericType::cpptype range_sum;
+    int range_count;
+    std::size_t current_tuple = 0;
+    while (current_tuple < kNumTuples) {
+      // Start of new partition
+      if (current_tuple % kNumTuplesPerPartition == 0) {
+        SetDataType(0, &range_sum);
+        range_count = 0;
+      }
+
+      // We have to consider following tuples with the same order key value.
+      std::size_t next_tuple = current_tuple;
+      while (next_tuple < kNumTuples &&
+             next_tuple / kNumTuplesPerPartition == current_tuple / kNumTuplesPerPartition &&
+             next_tuple / kNumTuplesPerOrderKey == current_tuple / kNumTuplesPerOrderKey) {
+        typename GenericType::cpptype *value = argument_cpp_vector[next_tuple];
+        if (value != nullptr) {
+          range_sum += *value;
+          range_count++;
+        }
+
+        next_tuple++;
+      }
+
+      // Calculate the result cpp value.
+      typename OutputType::cpptype *result_cpp_value = nullptr;
+      if (range_count != 0) {
+        result_cpp_value = new typename OutputType::cpptype;
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(range_sum) / range_count;
+      }
+
+      // Add the result values to the tuples with in the same order key value.
+      while (current_tuple != next_tuple) {
+        range_result_cpp_vector.push_back(result_cpp_value);
+        current_tuple++;
       }
     }
 
-    CheckAvgValues(result_cpp_vector, result);
+    CheckAvgValues(range_result_cpp_vector, range_result);
   }
 
   template <typename GenericType, typename OutputType>
@@ -230,19 +293,22 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
     std::vector<ColumnVector*> arguments;
     arguments.push_back(argument_type_vector);
     // The partition key index is 0.
-    std::vector<attribute_id> partition_key(1, 0);
+    std::vector<attribute_id> partition_key(1, kPartitionKeyIndex);
+    std::vector<attribute_id> order_key(1, kOrderKeyIndex);
 
-    ColumnVector *result =
+    // Check ROWS mode.
+    ColumnVector *rows_result =
         handle_avg_->calculate(tuple_accessor,
-                               std::move(arguments),
+                               arguments,
                                partition_key,
+                               order_key,
                                true  /* is_row */,
                                kNumPreceding,
                                kNumFollowing);
 
     // Get the cpptype result.
     // For each value, calculate all surrounding values in the window.
-    std::vector<typename OutputType::cpptype*> result_cpp_vector;
+    std::vector<typename OutputType::cpptype*> rows_result_cpp_vector;
 
     for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
       typename GenericType::cpptype sum;
@@ -281,16 +347,81 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
       }
 
       if (count == 0) {
-        result_cpp_vector.push_back(nullptr);
+        rows_result_cpp_vector.push_back(nullptr);
+      } else {
+        typename OutputType::cpptype *result_cpp_value =
+            new typename OutputType::cpptype;
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+        rows_result_cpp_vector.push_back(result_cpp_value);
+      }
+    }
+
+    CheckAvgValues(rows_result_cpp_vector, rows_result);
+
+    // Check RANGE mode.
+    ColumnVector *range_result =
+        handle_avg_->calculate(tuple_accessor,
+                               arguments,
+                               partition_key,
+                               order_key,
+                               false  /* is_row */,
+                               kNumPreceding,
+                               kNumFollowing);
+
+    // Get the cpptype result.
+    // For each value, calculate all surrounding values in the window.
+    std::vector<typename OutputType::cpptype*> range_result_cpp_vector;
+
+    for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+      typename GenericType::cpptype sum;
+      SetDataType(0, &sum);
+      int count = 0;
+
+      if (argument_cpp_vector[i] != nullptr) {
+        sum += *argument_cpp_vector[i];
+        count++;
+      }
+
+      int preceding_bound = i / kNumTuplesPerOrderKey - kNumPreceding;
+      for (std::size_t precede = 1; precede <= kNumTuples; ++precede) {
+        // Not in range or the same partition.
+        if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition ||
+            static_cast<int>((i - precede) / kNumTuplesPerOrderKey) < preceding_bound) {
+          break;
+        }
+
+        if (argument_cpp_vector[i - precede] != nullptr) {
+          sum += *argument_cpp_vector[i - precede];
+          count++;
+        }
+      }
+
+      int following_bound = i / kNumTuplesPerOrderKey + kNumFollowing;
+      for (int follow = 1; follow <= kNumTuples; ++follow) {
+        // Not in range or the same partition.
+        if (i + follow >= kNumTuples ||
+            i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition ||
+            static_cast<int>((i + follow) / kNumTuplesPerOrderKey) > following_bound) {
+          break;
+        }
+
+        if (argument_cpp_vector[i + follow] != nullptr) {
+          sum += *argument_cpp_vector[i + follow];
+          count++;
+        }
+      }
+
+      if (count == 0) {
+        rows_result_cpp_vector.push_back(nullptr);
       } else {
         typename OutputType::cpptype *result_cpp_value =
             new typename OutputType::cpptype;
         *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
-        result_cpp_vector.push_back(result_cpp_value);
+        range_result_cpp_vector.push_back(result_cpp_value);
       }
     }
 
-    CheckAvgValues(result_cpp_vector, result);
+    CheckAvgValues(range_result_cpp_vector, range_result);
   }
 
   std::unique_ptr<WindowAggregationHandle> handle_avg_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index ce21ade..88103df 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1663,7 +1663,7 @@ void ExecutionGenerator::convertWindowAggregate(
       std::static_pointer_cast<const E::WindowAggregateFunction>(
           named_window_aggregate_expression->expression());
 
-  // Set the AggregateFunction.
+  // Set the WindowAggregateFunction.
   window_aggr_state_proto->mutable_function()->MergeFrom(
       window_aggregate_function->window_aggregate().getProto());
 
@@ -1683,6 +1683,15 @@ void ExecutionGenerator::convertWindowAggregate(
         ->MergeFrom(concretized_partition_by_attribute->getProto());
   }
 
+  // Set order keys.
+  for (const E::ScalarPtr &order_by_attribute
+      : window_info.order_by_attributes) {
+    unique_ptr<const Scalar> concretized_order_by_attribute(
+        order_by_attribute->concretize(attribute_substitution_map_));
+    window_aggr_state_proto->add_order_by_attributes()
+        ->MergeFrom(concretized_order_by_attribute->getProto());
+  }
+
   // Set window frame info.
   if (window_info.frame_info == nullptr) {
     // If the frame is not specified, use the default setting:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index c224388..46808bf 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -814,9 +814,9 @@ L::LogicalPtr Resolver::resolveInsertSelection(
       cast_expressions.emplace_back(selection_attributes[aid]);
     } else {
       // TODO(jianqiao): implement Cast operation for non-numeric types.
-      if (destination_type.getSuperTypeID() == Type::kNumeric
-          && selection_type.getSuperTypeID() == Type::kNumeric
-          && destination_type.isSafelyCoercibleFrom(selection_type)) {
+      if (destination_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
+          selection_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
+          destination_type.isSafelyCoercibleFrom(selection_type)) {
         // Add cast operation
         const E::AttributeReferencePtr attr = selection_attributes[aid];
         const E::ExpressionPtr cast_expr =
@@ -1691,6 +1691,19 @@ E::WindowInfo Resolver::resolveWindow(const ParseWindow &parse_window,
   // Resolve window frame
   if (parse_window.frame_info() != nullptr) {
     const quickstep::ParseFrameInfo *parse_frame_info = parse_window.frame_info();
+    // For FRAME mode, the first attribute in ORDER BY must be numeric.
+    // TODO(Shixuan): Time-related types should also be supported. To handle
+    // this, some changes in the parser needs to be done since the time range
+    // should be specified with time units. Also, UNBOUNDED flags might be
+    // needed because -1 might not make sense in this case.
+    if (!parse_frame_info->is_row &&
+        (order_by_attributes.empty() ||
+         order_by_attributes[0]->getValueType().getSuperTypeID() != Type::SuperTypeID::kNumeric)) {
+      THROW_SQL_ERROR_AT(&parse_window)
+          << "A numeric attribute should be specified as the first ORDER BY "
+          << "attribute in FRAME mode";
+    }
+
     frame_info = new E::WindowFrameInfo(parse_frame_info->is_row,
                                         parse_frame_info->num_preceding,
                                         parse_frame_info->num_following);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 30a3c39..6bada6c 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -1025,17 +1025,40 @@ WINDOW w AS
 +--------------------+-----------+------------------------+
 ==
 
-# Currently this is not supported, an empty table will be returned.
-SELECT int_col, sum(float_col) OVER
-(PARTITION BY char_col, long_col
- ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
- RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+SELECT float_col, double_col, avg(double_col) OVER
+(ORDER BY float_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING)
 FROM test;
 --
-+-----------+------------------------+
-|int_col    |sum(float_col)          |
-+-----------+------------------------+
-+-----------+------------------------+
++---------------+------------------------+------------------------+
+|float_col      |double_col              |avg(double_col)         |
++---------------+------------------------+------------------------+
+|     4.89897966|      117.57550765359254|     -5.2010907233390986|
+|     4.79583168|     -110.30412503619254|     -3.3458568752518572|
+|     4.69041586|      103.18914671611546|     -3.3458568752518572|
+|      4.5825758|     -96.234089594072643|     -4.2942570191393745|
+|     4.47213602|                    NULL|     -4.2942570191393745|
+|     4.35889912|      -82.81907992727281|     -3.1771278735018194|
+|      4.2426405|      76.367532368147124|     -3.1771278735018194|
+|     4.12310553|     -70.092795635500224|     -3.6217507631683268|
+|              4|                      64|     -3.0100796703699935|
+|     3.87298346|     -58.094750193111253|     -3.0100796703699935|
+|      3.7416575|       52.38320341483518|     -3.0100796703699935|
+|     3.60555124|     -46.872166581031856|     -3.1193833079868254|
+|     3.46410155|      41.569219381653056|     -3.1193833079868254|
+|     3.31662488|       -36.4828726939094|     -2.8361542397614437|
+|      3.1622777|                    NULL|     -2.8361542397614437|
+|              3|                     -27|     -2.7526926834086507|
+|     2.82842708|      22.627416997969522|     -8.4826069851706123|
+|     2.64575124|     -18.520259177452136|     -9.0010404404476727|
+|     2.44948983|      14.696938456699067|     -4.1547599319129516|
+|     2.23606801|     -11.180339887498949|     -4.2708832009567148|
+|              2|                       8|     0.11724429467951912|
+|     1.73205078|      -5.196152422706632|     -4.7108157334609286|
+|     1.41421354|      2.8284271247461903|     -5.1226841602152344|
+|              1|                      -1|      -1.638218767582549|
+|              0|                    NULL|      1.1580686755098886|
++---------------+------------------------+------------------------+
 ==
 
 SELECT sum(avg(int_col) OVER w) FROM test

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 0cdfc1a..45119ab 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -56,6 +56,7 @@ WindowAggregationOperationState::WindowAggregationOperationState(
     const WindowAggregateFunction *window_aggregate_function,
     std::vector<std::unique_ptr<const Scalar>> &&arguments,
     const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
     const bool is_row,
     const std::int64_t num_preceding,
     const std::int64_t num_following,
@@ -84,10 +85,21 @@ WindowAggregationOperationState::WindowAggregationOperationState(
     partition_by_types.push_back(&partition_by_attribute->getType());
   }
 
+  // IDs of order keys and type of the first order key if in RANGE mode.
+  const Type* first_order_by_type = nullptr;
+  for (const std::unique_ptr<const Scalar> &order_by_attribute : order_by_attributes) {
+    order_by_ids_.push_back(
+        order_by_attribute->getAttributeIdForValueAccessor());
+    if (!is_row && first_order_by_type == nullptr) {
+      first_order_by_type = &order_by_attribute->getType();
+    }
+  }
+
   // Create the handle and initial state.
   window_aggregation_handle_.reset(
       window_aggregate_function->createHandle(std::move(argument_types),
-                                              std::move(partition_by_types)));
+                                              std::move(partition_by_types),
+                                              first_order_by_type));
 }
 
 WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
@@ -113,6 +125,15 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
         database));
   }
 
+  std::vector<std::unique_ptr<const Scalar>> order_by_attributes;
+  for (int attribute_idx = 0;
+       attribute_idx < proto.order_by_attributes_size();
+       ++attribute_idx) {
+    order_by_attributes.emplace_back(ScalarFactory::ReconstructFromProto(
+        proto.order_by_attributes(attribute_idx),
+        database));
+  }
+
   const bool is_row = proto.is_row();
   const std::int64_t num_preceding = proto.num_preceding();
   const std::int64_t num_following = proto.num_following();
@@ -121,6 +142,7 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
                                              &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()),
                                              std::move(arguments),
                                              partition_by_attributes,
+                                             order_by_attributes,
                                              is_row,
                                              num_preceding,
                                              num_following,
@@ -160,6 +182,15 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
     }
   }
 
+  for (int attribute_idx = 0;
+       attribute_idx < proto.order_by_attributes_size();
+       ++attribute_idx) {
+    if (!ScalarFactory::ProtoIsValid(proto.order_by_attributes(attribute_idx),
+                                     database)) {
+      return false;
+    }
+  }
+
   if (proto.num_preceding() < -1 || proto.num_following() < -1) {
     return false;
   }
@@ -177,14 +208,6 @@ void WindowAggregationOperationState::windowAggregateBlocks(
     return;
   }
 
-  // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar
-  // work. This will need Order Key to be passed so that we know where the
-  // window should start and end.
-  if (!is_row_) {
-    std::cout << "Currently we don't support RANGE frame mode :(\n";
-    return;
-  }
-
   // Get the total number of tuples.
   int num_tuples = 0;
   for (const block_id block_idx : block_ids) {
@@ -226,7 +249,11 @@ void WindowAggregationOperationState::windowAggregateBlocks(
                                      block->getIndices(),
                                      block->getIndicesConsistent());
     ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
-    ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
+    ColumnVectorsValueAccessor *argument_accessor = nullptr;
+    if (!arguments_.empty()) {
+      argument_accessor = new ColumnVectorsValueAccessor();
+    }
+
     for (const std::unique_ptr<const Scalar> &argument : arguments_) {
       argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
                                                           &sub_block_ref));
@@ -235,9 +262,15 @@ void WindowAggregationOperationState::windowAggregateBlocks(
     InvokeOnAnyValueAccessor(tuple_accessor,
                              [&] (auto *tuple_accessor) -> void {  // NOLINT(build/c++11)
       tuple_accessor->beginIteration();
-      argument_accessor->beginIteration();
+      if (argument_accessor != nullptr) {
+        argument_accessor->beginIteration();
+      }
+
+      while (tuple_accessor->next()) {
+        if (argument_accessor != nullptr) {
+          argument_accessor->next();
+        }
 
-      while (tuple_accessor->next() && argument_accessor->next()) {
         for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) {
           ColumnVector *attr_vec = attribute_vecs[attr_id];
           if (attr_vec->isNative()) {
@@ -275,8 +308,9 @@ void WindowAggregationOperationState::windowAggregateBlocks(
   // Do actual calculation in handle.
   ColumnVector *window_aggregates =
       window_aggregation_handle_->calculate(all_blocks_accessor,
-                                            std::move(argument_vecs),
+                                            argument_vecs,
                                             partition_by_ids_,
+                                            order_by_ids_,
                                             is_row_,
                                             num_preceding_,
                                             num_following_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index 9792a99..cb29b92 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -57,6 +57,7 @@ class WindowAggregationOperationState {
    *                                   computed.
    * @param arguments A list of argument expressions to that aggregate.
    * @param partition_by_attributes A list of window partition key.
+   * @param order_by_attributes A list of window order key.
    * @param is_row True if the window frame is calculated by ROW, false if it is
    *               calculated by RANGE.
    * @param num_preceding The number of rows/range for the tuples preceding the
@@ -69,6 +70,7 @@ class WindowAggregationOperationState {
                                   const WindowAggregateFunction *window_aggregate_function,
                                   std::vector<std::unique_ptr<const Scalar>> &&arguments,
                                   const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+                                  const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
                                   const bool is_row,
                                   const std::int64_t num_preceding,
                                   const std::int64_t num_following,
@@ -121,6 +123,7 @@ class WindowAggregationOperationState {
   std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
   std::vector<std::unique_ptr<const Scalar>> arguments_;
   std::vector<attribute_id> partition_by_ids_;
+  std::vector<attribute_id> order_by_ids_;
 
   // Frame info.
   const bool is_row_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ff513a6d/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index d888461..f879713 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -30,4 +30,5 @@ message WindowAggregationOperationState {
   required bool is_row = 5;
   required int64 num_preceding = 6;  // -1 means UNBOUNDED PRECEDING.
   required int64 num_following = 7;  // -1 means UNBOUNDED FOLLOWING.
+  repeated Scalar order_by_attributes = 8;
 }