You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/11 14:12:20 UTC

[1/3] incubator-quickstep git commit: Added handle for avg

Repository: incubator-quickstep
Updated Branches:
  refs/heads/SQL-window-aggregation [created] d5f535ee9


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt
index fb75767..9313e51 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -39,6 +39,8 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
                       quickstep_expressions_tablegenerator_GeneratorFunction
                       quickstep_expressions_tablegenerator_GeneratorFunctionFactory
                       quickstep_expressions_tablegenerator_GeneratorFunctionHandle
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
                       quickstep_parser_ParseAssignment
                       quickstep_parser_ParseBasicExpressions
                       quickstep_parser_ParseBlockProperties

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index f10378b..11348fe 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -35,6 +35,8 @@
 #include "expressions/table_generator/GeneratorFunction.hpp"
 #include "expressions/table_generator/GeneratorFunctionFactory.hpp"
 #include "expressions/table_generator/GeneratorFunctionHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
 #include "parser/ParseAssignment.hpp"
 #include "parser/ParseBasicExpressions.hpp"
 #include "parser/ParseBlockProperties.hpp"
@@ -2624,11 +2626,19 @@ E::ScalarPtr Resolver::resolveFunctionCall(
         << "COUNT aggregate has both star (*) and non-star arguments.";
   }
 
-  // Try to look up the AggregateFunction by name using
-  // AggregateFunctionFactory.
-  const ::quickstep::AggregateFunction *aggregate
-      = AggregateFunctionFactory::GetByName(function_name);
-  if (aggregate == nullptr) {
+  // Try to look up the AggregateFunction/WindowAggregationFunction by name.
+  // TODO(Shixuan): We might want to create a new abstract class Function to
+  // include both AggregateFunction and WindowAggregateFunction, which will make
+  // this part of code cleaner.
+  const ::quickstep::AggregateFunction *aggregate;
+  const ::quickstep::WindowAggregateFunction *window_aggregate;
+  if (parse_function_call.isWindow()) {
+    window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
+  } else {
+    aggregate = AggregateFunctionFactory::GetByName(function_name);
+  }
+
+  if (aggregate == nullptr && window_aggregate == nullptr) {
     THROW_SQL_ERROR_AT(&parse_function_call)
         << "Unrecognized function name \""
         << parse_function_call.name()->value()
@@ -2656,11 +2666,12 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   }
 
   // Make sure a naked COUNT() with no arguments wasn't specified.
-  if ((aggregate->getAggregationID() == AggregationID::kCount)
-      && (resolved_arguments.empty())
-      && (!count_star)) {
-    THROW_SQL_ERROR_AT(&parse_function_call)
-        << "COUNT aggregate requires an argument (either scalar or star (*))";
+  if ((aggregate != nullptr && aggregate->getAggregationID() == AggregationID::kCount)
+      || (window_aggregate != nullptr && window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
+    if ((resolved_arguments.empty()) && !count_star) {
+      THROW_SQL_ERROR_AT(&parse_function_call)
+          << "COUNT function requires an argument (either scalar or star (*))";
+    }
   }
 
   // Resolve each of the Scalar arguments to the aggregate.
@@ -2670,16 +2681,17 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   }
 
   // Make sure that the aggregate can apply to the specified argument(s).
-  if (!aggregate->canApplyToTypes(argument_types)) {
+  if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
+      || (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
     THROW_SQL_ERROR_AT(&parse_function_call)
-        << "Aggregate function " << aggregate->getName()
+        << "Function " << aggregate->getName()
         << " can not apply to the given argument(s).";
   }
 
   if (parse_function_call.isWindow()) {
     return resolveWindowAggregateFunction(parse_function_call,
                                           expression_resolution_info,
-                                          aggregate,
+                                          window_aggregate,
                                           resolved_arguments);
   }
 
@@ -2705,7 +2717,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
 E::ScalarPtr Resolver::resolveWindowAggregateFunction(
     const ParseFunctionCall &parse_function_call,
     ExpressionResolutionInfo *expression_resolution_info,
-    const ::quickstep::AggregateFunction *window_aggregate,
+    const ::quickstep::WindowAggregateFunction *window_aggregate,
     const std::vector<E::ScalarPtr> &resolved_arguments) {
   // A window aggregate function might be defined OVER a window name or a window.
   E::WindowAggregateFunctionPtr window_aggregate_function;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp
index f4024e9..373430c 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -23,7 +23,6 @@
 #include <unordered_set>
 #include <vector>
 
-#include "query_optimizer/expressions/AggregateFunction.hpp"
 #include "query_optimizer/expressions/Alias.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
@@ -460,14 +459,14 @@ class Resolver {
    * @param expression_resolution_info Resolution info that contains the name
    *                                   resolver and info to be updated after
    *                                   resolution.
-   * @param aggregate The aggregate function.
+   * @param aggregate The window aggregate function.
    * @param resolved_arguments The resolved arguments.
    * @return An expression in the query optimizer.
    */
   expressions::ScalarPtr resolveWindowAggregateFunction(
       const ParseFunctionCall &parse_function_call,
       ExpressionResolutionInfo *expression_resolution_info,
-      const ::quickstep::AggregateFunction *aggregate,
+      const ::quickstep::WindowAggregateFunction *aggregate,
       const std::vector<expressions::ScalarPtr> &resolved_arguments);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 249441d..a51370b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -434,6 +434,7 @@ target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_StorageBlockInfo
+                      quickstep_storage_WindowAggregationOperationState
                       quickstep_utility_Macros
                       tmb)                      
 target_link_libraries(quickstep_relationaloperators_WorkOrder

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 93cb9d4..ec2f27c 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -21,11 +21,13 @@
 
 #include <vector>
 
+#include "catalog/CatalogRelation.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.hpp"
 
 #include "tmb/id_typedefs.h"
 
@@ -44,6 +46,7 @@ bool WindowAggregationOperator::getAllWorkOrders(
         new WindowAggregationWorkOrder(
             query_id_,
             query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+            block_ids_,
             query_context->getInsertDestination(output_destination_index_)),
         op_index_);
     generated_ = true;
@@ -67,6 +70,9 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
   proto->set_query_id(query_id_);
   proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
                       window_aggregation_state_index_);
+  for (block_id bid : block_ids_) {
+    proto->AddExtension(serialization::WindowAggregationWorkOrder::block_ids, bid);
+  }
   proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
                       output_destination_index_);
 
@@ -75,8 +81,8 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
 
 
 void WindowAggregationWorkOrder::execute() {
-  std::cout << "Window aggregation is not supported yet.\n"
-      << "An empty table is returned\n";
+  state_->windowAggregateBlocks(output_destination_,
+                                block_ids_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
index f3dfd14..4084ffc 100644
--- a/relational_operators/WindowAggregationOperator.hpp
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -58,16 +58,19 @@ class WindowAggregationOperator : public RelationalOperator {
    *
    * @param query_id The ID of this query.
    * @param input_relation The relation to perform aggregation over.
+   * @param output_relation The relation for output.
    * @param window_aggregation_state_index The index of WindowAggregationState
    *                                       in QueryContext.
    * @param output_destination_index The index of InsertDestination in
    *                                 QueryContext for the output.
    **/
   WindowAggregationOperator(const std::size_t query_id,
+                            const CatalogRelation &input_relation,
                             const CatalogRelation &output_relation,
                             const QueryContext::window_aggregation_state_id window_aggregation_state_index,
                             const QueryContext::insert_destination_id output_destination_index)
       : RelationalOperator(query_id),
+        block_ids_(input_relation.getBlocksSnapshot()),
         output_relation_(output_relation),
         window_aggregation_state_index_(window_aggregation_state_index),
         output_destination_index_(output_destination_index),
@@ -99,6 +102,7 @@ class WindowAggregationOperator : public RelationalOperator {
    **/
   serialization::WorkOrder* createWorkOrderProto();
 
+  const std::vector<block_id> block_ids_;
   const CatalogRelation &output_relation_;
   const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
   const QueryContext::insert_destination_id output_destination_index_;
@@ -121,39 +125,20 @@ class WindowAggregationWorkOrder : public WorkOrder {
    **/
   WindowAggregationWorkOrder(const std::size_t query_id,
                              WindowAggregationOperationState *state,
+                             const std::vector<block_id> &block_ids,
                              InsertDestination *output_destination)
       : WorkOrder(query_id),
         state_(state),
+        block_ids_(block_ids),
         output_destination_(output_destination)  {}
 
   ~WindowAggregationWorkOrder() override {}
 
-  /**
-   * @brief Get the pointer to WindowAggregationOperationState.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return A pointer to the window aggregation operation state.
-   **/
-  WindowAggregationOperationState* state() {
-    return state_;
-  }
-
-  /**
-   * @brief Get the pointer to output destination.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return A pointer to the output destination.
-   **/
-  InsertDestination* output_destination() {
-    return output_destination_;
-  }
-
   void execute() override;
 
  private:
   WindowAggregationOperationState *state_;
+  const std::vector<block_id> block_ids_;
   InsertDestination *output_destination_;
 
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationWorkOrder);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 69dee1b..076735f 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -249,6 +249,7 @@ message WindowAggregationWorkOrder {
   extend WorkOrder {
     // All required
     optional uint32 window_aggr_state_index = 336;
-    optional int32 insert_destination_index = 337;
+    repeated fixed64 block_ids = 337;
+    optional int32 insert_destination_index = 338;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 9df66e1..a0bb8ea 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -1053,19 +1053,20 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_ExpressionFactories
                       quickstep_expressions_Expressions_proto
-                      quickstep_expressions_aggregation_AggregateFunction
-                      quickstep_expressions_aggregation_AggregateFunctionFactory
-                      quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_expressions_windowaggregation_WindowAggregationID
                       quickstep_storage_StorageBlockInfo
+                      quickstep_storage_InsertDestination
                       quickstep_storage_StorageManager
                       quickstep_storage_WindowAggregationOperationState_proto
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
-                      quickstep_expressions_aggregation_AggregateFunction_proto
                       quickstep_expressions_Expressions_proto
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
                       ${PROTOBUF_LIBRARY})
 
 # Module all-in-one library:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index a0bcc37..8d05b79 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -31,12 +31,13 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/ExpressionFactories.hpp"
 #include "expressions/Expressions.pb.h"
-#include "expressions/aggregation/AggregateFunction.hpp"
-#include "expressions/aggregation/AggregateFunctionFactory.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/InsertDestination.hpp"
 #include "storage/StorageManager.hpp"
 #include "storage/WindowAggregationOperationState.pb.h"
 
@@ -46,7 +47,7 @@ namespace quickstep {
 
 WindowAggregationOperationState::WindowAggregationOperationState(
     const CatalogRelationSchema &input_relation,
-    const AggregateFunction *window_aggregate_function,
+    const WindowAggregateFunction *window_aggregate_function,
     std::vector<std::unique_ptr<const Scalar>> &&arguments,
     std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
     const bool is_row,
@@ -55,7 +56,6 @@ WindowAggregationOperationState::WindowAggregationOperationState(
     StorageManager *storage_manager)
     : input_relation_(input_relation),
       arguments_(std::move(arguments)),
-      partition_by_attributes_(std::move(partition_by_attributes)),
       is_row_(is_row),
       num_preceding_(num_preceding),
       num_following_(num_following),
@@ -71,11 +71,19 @@ WindowAggregationOperationState::WindowAggregationOperationState(
   // Check if window aggregate function could apply to the arguments.
   DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
 
+  // IDs and types of partition keys.
+  std::vector<attribute_id> partition_by_ids;
+  std::vector<const Type*> partition_by_types;
+  for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
+    partition_by_ids.push_back(
+        partition_by_attribute->getAttributeIdForValueAccessor());
+    partition_by_types.push_back(&partition_by_attribute->getType());
+  }
+
   // Create the handle and initial state.
   window_aggregation_handle_.reset(
-      window_aggregate_function->createHandle(argument_types));
-  window_aggregation_state_.reset(
-      window_aggregation_handle_->createInitialState());
+      window_aggregate_function->createHandle(std::move(argument_types),
+                                              std::move(partition_by_types)));
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   // See if all of this window aggregate's arguments are attributes in the input
@@ -88,7 +96,7 @@ WindowAggregationOperationState::WindowAggregationOperationState(
       arguments_as_attributes_.clear();
       break;
     } else {
-      DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
+      DCHECK_EQ(input_relation.getID(), argument->getRelationIdForValueAccessor());
       arguments_as_attributes_.push_back(argument_id);
     }
   }
@@ -102,8 +110,8 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
   DCHECK(ProtoIsValid(proto, database));
 
   // Rebuild contructor arguments from their representation in 'proto'.
-  const AggregateFunction *aggregate_function
-      = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
+  const WindowAggregateFunction *window_aggregate_function
+      = &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function());
 
   std::vector<std::unique_ptr<const Scalar>> arguments;
   arguments.reserve(proto.arguments_size());
@@ -126,8 +134,8 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
   const std::int64_t num_preceding = proto.num_preceding();
   const std::int64_t num_following = proto.num_following();
 
-  return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
-                                             aggregate_function,
+  return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+                                             window_aggregate_function,
                                              std::move(arguments),
                                              std::move(partition_by_attributes),
                                              is_row,
@@ -139,11 +147,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
 bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
                                                    const CatalogDatabaseLite &database) {
   if (!proto.IsInitialized() ||
-      !database.hasRelationWithId(proto.relation_id())) {
+      !database.hasRelationWithId(proto.input_relation_id())) {
     return false;
   }
 
-  if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+  if (!WindowAggregateFunctionFactory::ProtoIsValid(proto.function())) {
     return false;
   }
 
@@ -176,4 +184,18 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
   return true;
 }
 
+void WindowAggregationOperationState::windowAggregateBlocks(
+    InsertDestination *output_destination,
+    const std::vector<block_id> &block_ids) {
+  window_aggregation_handle_->calculate(arguments_,
+                                        block_ids,
+                                        partition_by_ids_,
+                                        input_relation_,
+                                        is_row_,
+                                        num_preceding_,
+                                        num_following_,
+                                        storage_manager_,
+                                        output_destination);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index d7b3e6a..f38dbd8 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -25,7 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -34,7 +34,7 @@
 
 namespace quickstep {
 
-class AggregateFunction;
+class WindowAggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
 class InsertDestination;
@@ -67,7 +67,7 @@ class WindowAggregationOperationState {
    *        tables.
    */
   WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
-                                  const AggregateFunction *window_aggregate_function,
+                                  const WindowAggregateFunction *window_aggregate_function,
                                   std::vector<std::unique_ptr<const Scalar>> &&arguments,
                                   std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
                                   const bool is_row,
@@ -107,54 +107,25 @@ class WindowAggregationOperationState {
                            const CatalogDatabaseLite &database);
 
   /**
-   * @brief Get the is_row info.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   * 
-   * @return True if the frame mode is ROW, false if it is RANGE.
-   **/
-  const bool is_row() const { return is_row_; }
-
-  /**
-   * @brief Get the num_preceding info.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return The number of rows/range that precedes the current row.
-   **/
-  const std::int64_t num_preceding() const { return num_preceding_; }
-
-  /**
-   * @brief Get the num_following info.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return The number of rows/range that follows the current row.
-   **/
-  const std::int64_t num_following() const { return num_following_; }
-
-  /**
-   * @brief Get the pointer to StorageManager.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
+   * @brief Compute window aggregates on the tuples of the given relation.
    *
-   * @return A pointer to the storage manager.
+   * @param output_destination The output destination for the computed window
+   *                           aggregate.
    **/
-  StorageManager *storage_manager() { return storage_manager_; }
+  void windowAggregateBlocks(InsertDestination *output_destination,
+                             const std::vector<block_id> &block_ids);
 
  private:
   const CatalogRelationSchema &input_relation_;
 
   // TODO(Shixuan): Handle and State for window aggregation will be needed for
   //                actual calculation.
-  std::unique_ptr<AggregationHandle> window_aggregation_handle_;
-  std::unique_ptr<AggregationState> window_aggregation_state_;
+  std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
+  
   std::vector<std::unique_ptr<const Scalar>> arguments_;
+  std::vector<attribute_id> partition_by_ids_;
 
-  // We don't add order_by_attributes here since it is not needed after sorting.
-  std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
-
-  // Window framing information.
+  // Frame info.
   const bool is_row_;
   const std::int64_t num_preceding_;
   const std::int64_t num_following_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index c7bd0ef..4dc0a6a 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -19,15 +19,15 @@ syntax = "proto2";
 
 package quickstep.serialization;
 
-import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/window_aggregation/WindowAggregateFunction.proto";
 import "expressions/Expressions.proto";
 
 message WindowAggregationOperationState {
-  required int32 relation_id = 1;
-  required AggregateFunction function = 2;
-  repeated Scalar arguments = 3;
-  repeated Scalar partition_by_attributes = 4;
-  required bool is_row = 5;
-  required int64 num_preceding = 6;  // -1 means UNBOUNDED PRECEDING.
-  required int64 num_following = 7;  // -1 means UNBOUNDED FOLLOWING.
+  required int32 input_relation_id = 1;
+  required WindowAggregateFunction function = 3;
+  repeated Scalar arguments = 4;
+  repeated Scalar partition_by_attributes = 5;
+  required bool is_row = 6;
+  required int64 num_preceding = 7;  // -1 means UNBOUNDED PRECEDING.
+  required int64 num_following = 8;  // -1 means UNBOUNDED FOLLOWING.
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/tests/WindowAggregationOperationState_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/WindowAggregationOperationState_unittest.cpp b/storage/tests/WindowAggregationOperationState_unittest.cpp
index c572034..d58f0f5 100644
--- a/storage/tests/WindowAggregationOperationState_unittest.cpp
+++ b/storage/tests/WindowAggregationOperationState_unittest.cpp
@@ -23,7 +23,7 @@
 #include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "storage/WindowAggregationOperationState.hpp"
 #include "storage/WindowAggregationOperationState.pb.h"
 
@@ -57,8 +57,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, UninitializationTest) {
 
 TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
   serialization::WindowAggregationOperationState proto;
-  proto.set_relation_id(kInvalidTableId);
-  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_input_relation_id(kInvalidTableId);
+  proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
   proto.set_is_row(true);
   proto.set_num_preceding(kValidNum);
   proto.set_num_following(kValidNum);
@@ -67,8 +67,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
 
 TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
   serialization::WindowAggregationOperationState proto;
-  proto.set_relation_id(rel_id_);
-  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_input_relation_id(rel_id_);
+  proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
   proto.set_is_row(true);
   proto.set_num_preceding(kInvalidNum);
   proto.set_num_following(kValidNum);
@@ -81,8 +81,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
 
 TEST_F(WindowAggregationOperationStateProtoTest, ValidTest) {
   serialization::WindowAggregationOperationState proto;
-  proto.set_relation_id(rel_id_);
-  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_input_relation_id(rel_id_);
+  proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
   proto.set_is_row(true);
   proto.set_num_preceding(kValidNum);
   proto.set_num_following(kValidNum);


[3/3] incubator-quickstep git commit: Seperated calculation into two parts to check intermediate result

Posted by sh...@apache.org.
Seperated calculation into two parts to check intermediate result


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d5f535ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d5f535ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d5f535ee

Branch: refs/heads/SQL-window-aggregation
Commit: d5f535ee9f0e3c7a8ea7615a07699b83c5f905f4
Parents: 2966405
Author: shixuan-fan <sh...@apache.org>
Authored: Mon Jul 11 14:11:47 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Mon Jul 11 14:11:47 2016 +0000

----------------------------------------------------------------------
 .../WindowAggregateFunction.hpp                 |   4 +
 .../WindowAggregateFunctionAvg.cpp              |   6 +-
 .../WindowAggregateFunctionAvg.hpp              |   2 +
 .../WindowAggregationHandle.hpp                 |  16 +-
 .../WindowAggregationHandleAvg.cpp              |  78 ++++++----
 .../WindowAggregationHandleAvg.hpp              |  11 +-
 .../WindowAggregationHandleAvg_unittest.cpp     | 148 ++++++++++++++++++-
 query_optimizer/ExecutionGenerator.cpp          |   5 +
 query_optimizer/resolver/Resolver.cpp           |   8 +-
 .../WindowAggregationOperator.cpp               |   3 +-
 storage/WindowAggregationOperationState.cpp     |  25 +++-
 storage/WindowAggregationOperationState.hpp     |   5 +-
 storage/WindowAggregationOperationState.proto   |   1 +
 13 files changed, 253 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index 9cc5d74..84d97fc 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -26,10 +26,12 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
+class CatalogRelationSchema;
 class WindowAggregationHandle;
 class Type;
 
@@ -128,6 +130,8 @@ class WindowAggregateFunction {
    *         is responsible for deleting the returned object.
    **/
   virtual WindowAggregationHandle* createHandle(
+      const CatalogRelationSchema &relation,
+      const std::vector<block_id> block_ids,
       std::vector<const Type*> &&argument_types,
       std::vector<const Type*> &&partition_key_types) const = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index e9a4453..06ff1d9 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -72,13 +72,17 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
 }
 
 WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
+    const CatalogRelationSchema &relation,
+    const std::vector<block_id> block_ids,
     std::vector<const Type*> &&argument_types,
     std::vector<const Type*> &&partition_key_types) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
       << " that AVG can not be applied to.";
 
-  return new WindowAggregationHandleAvg(*argument_types.front(),
+  return new WindowAggregationHandleAvg(relation,
+                                        block_ids,
+                                        *argument_types.front(),
                                         std::move(partition_key_types));
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 18e1022..91acf7e 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -57,6 +57,8 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
       const std::vector<const Type*> &argument_types) const override;
 
   WindowAggregationHandle* createHandle(
+      const CatalogRelationSchema &relation,
+      const std::vector<block_id> block_ids,
       std::vector<const Type*> &&argument_types,
       std::vector<const Type*> &&partition_key_types) const override;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 77b1e76..6b7988a 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -99,14 +99,13 @@ class WindowAggregationHandle {
    * @param output_destination The destination for output.
    **/
   virtual void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
-                         const std::vector<block_id> &block_ids,
                          const std::vector<attribute_id> &partition_by_ids,
-                         const CatalogRelationSchema &relation,
                          const bool is_row,
                          const std::int64_t num_preceding,
                          const std::int64_t num_following,
-                         StorageManager *storage_manager,
-                         InsertDestinationInterface *output_destination) const = 0;
+                         StorageManager *storage_manager) = 0;
+
+  virtual std::vector<ValueAccessor*>&& finalize(StorageManager *storage_manager) = 0;
 
  protected:
   /**
@@ -119,7 +118,14 @@ class WindowAggregationHandle {
    * @param num_following The number of rows/range that follows the current row.
    * @param storage_manager A pointer to the storage manager.
    **/
-  WindowAggregationHandle() {}
+  WindowAggregationHandle(const CatalogRelationSchema &relation,
+                          const std::vector<block_id> block_ids)
+      : block_ids_(block_ids),
+        relation_(relation) {}
+
+  std::vector<ColumnVector*> window_aggregates_;
+  const std::vector<block_id> block_ids_;
+  const CatalogRelationSchema &relation_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index 62f5a88..e6e2894 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -25,6 +25,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
 #include "storage/InsertDestinationInterface.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageManager.hpp"
@@ -51,9 +52,12 @@ namespace quickstep {
 class StorageManager;
 
 WindowAggregationHandleAvg::WindowAggregationHandleAvg(
+    const CatalogRelationSchema &relation,
+    const std::vector<block_id> &block_ids,
     const Type &type,
     std::vector<const Type*> &&partition_key_types)
-    : argument_type_(type) {
+    : WindowAggregationHandle(relation, block_ids),
+      argument_type_(type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_id;
@@ -98,24 +102,20 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
 }
 
 void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
-                                           const std::vector<block_id> &block_ids,
                                            const std::vector<attribute_id> &partition_by_ids,
-                                           const CatalogRelationSchema &relation,
                                            const bool is_row,
                                            const std::int64_t num_preceding,
                                            const std::int64_t num_following,
-                                           StorageManager *storage_manager,
-                                           InsertDestinationInterface *output_destination) const {
+                                           StorageManager *storage_manager) {
   DCHECK(arguments.size() == 1);
-  DCHECK(!block_ids.empty());
   
   // Initialize the tuple accessors and argument accessors.
   // Index of each value accessor indicates the block it belongs to.
   std::vector<ValueAccessor*> tuple_accessors;
   std::vector<ColumnVectorsValueAccessor*> argument_accessors;
-  for (block_id bid : block_ids) {
+  for (block_id bid : block_ids_) {
     // Get tuple accessor.
-    BlockReference block = storage_manager->getBlock(bid, relation);
+    BlockReference block = storage_manager->getBlock(bid, relation_);
     const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
     ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
     tuple_accessors.push_back(tuple_accessor);
@@ -132,12 +132,14 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
 
   // Create a window for each tuple and calculate the window aggregate.
   for (std::uint32_t current_block_index = 0;
-       current_block_index < block_ids.size();
+       current_block_index < block_ids_.size();
        ++current_block_index) {
     ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
     ColumnVectorsValueAccessor* argument_accessor =
         argument_accessors[current_block_index];
-    
+    NativeColumnVector window_aggregates_for_block(*result_type_,
+                                                   argument_accessor->getNumTuples());
+
     InvokeOnAnyValueAccessor (
         tuple_accessor,
         [&] (auto *tuple_accessor) -> void {
@@ -145,24 +147,48 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
       argument_accessor->beginIteration();
       
       while (tuple_accessor->next() && argument_accessor->next()) {
-        TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
-                                                               argument_accessors,
-                                                               partition_by_ids,
-                                                               current_block_index,
-                                                               is_row,
-                                                               num_preceding,
-                                                               num_following);
-        Tuple *current_tuple = tuple_accessor->getTuple();
-        std::vector<TypedValue> new_tuple;
-        for (TypedValue value : *current_tuple) {
-          new_tuple.push_back(value);
-        }
-
-        new_tuple.push_back(window_aggregate);
-        output_destination->insertTupleInBatch(Tuple(std::move(new_tuple)));
+        const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
+                                                                     argument_accessors,
+                                                                     partition_by_ids,
+                                                                     current_block_index,
+                                                                     is_row,
+                                                                     num_preceding,
+                                                                     num_following);
+        window_aggregates_for_block.appendTypedValue(window_aggregate);
       }
     });
+
+    window_aggregates_.push_back(&window_aggregates_for_block);
+  }
+}
+
+std::vector<ValueAccessor*>&& WindowAggregationHandleAvg::finalize(
+    StorageManager *storage_manager) {
+  std::vector<ValueAccessor*> accessors;
+  
+  // Create a ValueAccessor for each block, including the new window aggregate
+  // attribute.
+  for (std::size_t block_idx = 0; block_idx < block_ids_.size(); ++block_idx) {
+    // Get the block information.
+    BlockReference block = storage_manager->getBlock(block_idx, relation_);
+    const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+    ValueAccessor *block_accessor = tuple_block.createValueAccessor();
+    SubBlocksReference sub_block_ref(tuple_block,
+                                     block->getIndices(),
+                                     block->getIndicesConsistent());
+    ColumnVectorsValueAccessor accessor;
+
+    for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+         attr_it != relation_.end();
+         ++attr_it) {
+      ScalarAttribute scalar_attr(*attr_it);
+      accessor.addColumn(scalar_attr.getAllValues(block_accessor, &sub_block_ref));
+    }
+
+    accessors.push_back(&accessor);
   }
+
+  return std::move(accessors);
 }
 
 TypedValue WindowAggregationHandleAvg::calculateOneWindow(
@@ -287,7 +313,7 @@ bool WindowAggregationHandleAvg::samePartition(
     const std::vector<attribute_id> &partition_by_ids) const {
   return InvokeOnAnyValueAccessor (tuple_accessor,
                                    [&] (auto *tuple_accessor) -> bool {
-    for (std::uint32_t partition_by_index = 0;
+    for (std::size_t partition_by_index = 0;
          partition_by_index < partition_by_ids.size();
          ++partition_by_index) {
       if (!equal_comparators_[partition_by_index]->compareTypedValues(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 115152e..e0ec766 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -56,14 +56,13 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
   ~WindowAggregationHandleAvg() override {}
 
   void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
-                 const std::vector<block_id> &block_ids,
                  const std::vector<attribute_id> &partition_by_ids,
-                 const CatalogRelationSchema &relation,
                  const bool is_row,
                  const std::int64_t num_preceding,
                  const std::int64_t num_following,
-                 StorageManager *storage_manager,
-                 InsertDestinationInterface *output_destination) const;
+                 StorageManager *storage_manager);
+
+  std::vector<ValueAccessor*>&& finalize(StorageManager *storage_manager);
 
  private:
   friend class WindowAggregateFunctionAvg;
@@ -79,7 +78,9 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
    * @param storage_manager A pointer to the storage manager.
    * @param type Type of the avg value.
    **/
-  explicit WindowAggregationHandleAvg(const Type &type,
+  explicit WindowAggregationHandleAvg(const CatalogRelationSchema &relation,
+                                      const std::vector<block_id> &block_ids,
+                                      const Type &type,
                                       std::vector<const Type*> &&partition_key_types);
 
   TypedValue calculateOneWindow(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index 6a7d161..326afa7 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -51,12 +51,142 @@ namespace quickstep {
 
 namespace {
 
-  constexpr int kNumSamples = 100;
+  constexpr int kNumTuplesPerBlock = 100;
+  constexpr int kNumBlocks = 5;
+  constexpr int kNumTuplesPerPartition = 8;
 
 }  // namespace
 
-class WindowAggregationHandleAvgTest : public::testing::Test {
+// Attribute value could be null if set true.
+class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
  protected:
+  virtual void SetUp() {
+    // Initialize relation and storage manager.
+    relation_.reset(new CatalogRelation(NULL, "TestRelation", kRelationId));
+    storage_manager_.reset(new StorageManager("TestAvg"));
+
+    // Add All kinds of TypedValues.
+    CatalogAttribute *int_attr = new CatalogAttribute(relation_.get(),
+                                                      "int_attr",
+                                                      TypeFactory::GetType(kInt, GetParam()));
+
+    relation_->addAttribute(int_attr);
+
+    CatalogAttribute *float_attr = new CatalogAttribute(relation_.get(),
+                                                        "float_attr",
+                                                        TypeFactory::GetType(kFloat, GetParam()));
+    relation_->addAttribute(float_attr);
+
+    CatalogAttribute *long_attr = new CatalogAttribute(relation_.get(),
+                                                       "long_attr",
+                                                       TypeFactory::GetType(kLong, GetParam()));
+    relation_->addAttribute(long_attr);
+
+    CatalogAttribute *double_attr = new CatalogAttribute(relation_.get(),
+                                                         "double_attr",
+                                                         TypeFactory::GetType(kDouble, GetParam()));
+    relation_->addAttribute(double_attr);
+
+    CatalogAttribute *char_attr = new CatalogAttribute(relation_.get(),
+                                                       "char_attr",
+                                                       TypeFactory::GetType(kChar, 4, GetParam()));
+    relation_->addAttribute(char_attr);
+
+    CatalogAttribute *varchar_attr = new CatalogAttribute(relation_.get(),
+                                                          "varchar_attr",
+                                                          TypeFactory::GetType(kVarChar, 32, GetParam()));
+    relation_->addAttribute(varchar_attr);
+    
+    // Records the 'base_value' of a tuple used in createSampleTuple.
+    CatalogAttribute *partition_value = new CatalogAttribute(relation_.get(),
+                                                             "partition_value",
+                                                             TypeFactory::GetType(kInt, false));
+    relation_->addAttribute(partition_value);
+
+    StorageBlockLayout *layout = StorageBlockLayout::GenerateDefaultLayout(*relation_, true);
+
+    // Initialize blocks.
+    for (int i = 0; i < kNumBlocks; ++i) {
+      block_id bid = storage_manager_->createBlock(relation_, layout);
+      relation_->addBlock(bid);
+      insertTuples(bid);
+    }
+  }
+
+  // Insert kNumTuplesPerBlock tuples into the block.
+  void insertTuples(block_id bid) {
+    MutableBlockReference block = storage_manager_->getBlockMutable(bid, relation_);
+    for (int i = 0; i < kNumTuplesPerBlock; ++i) {
+      Tuple *tuple = createTuple(bid * kNumTuplesPerBlock + i);
+      block->insertTuple(*tuple);
+    }
+  }
+
+  Tuple* createTuple(int base_value) {
+    std::vector<TypedValue> attrs;
+
+    // int_attr.
+    if (GetParam() && base_value % 10 == 0) {
+      // Throw in a NULL integer for every ten values.
+      attrs.emplace_back(kInt);
+    } else {
+      attrs.emplace_back(base_value);
+    }
+
+    // float_attr.
+    if (GetParam() && base_value % 10 == 1) {
+      attrs.emplace_back(kFloat);
+    } else {
+      attrs.emplace_back(static_cast<float>(0.4 * base_value));
+    }
+
+    // long_attr.
+    if (GetParam() && base_value % 10 == 2) {
+      attrs.emplace_back(kLong);
+    } else {
+      attrs.emplace_back(static_cast<std::int64_t>(base_value));
+    }
+
+    // double_attr.
+    if (GetParam() && base_value % 10 == 3) {
+      attrs.emplace_back(kDouble);
+    } else {
+      attrs.emplace_back(static_cast<double>(0.25 * base_value));
+    }
+
+    // char_attr
+    if (GetParam() && base_value % 10 == 4) {
+      attrs.emplace_back(CharType::InstanceNullable(4).makeNullValue());
+    } else {
+      std::ostringstream char_buffer;
+      char_buffer << base_value;
+      std::string string_literal(char_buffer.str());
+      attrs.emplace_back(CharType::InstanceNonNullable(4).makeValue(
+          string_literal.c_str(),
+          string_literal.size() > 3 ? 4
+                                    : string_literal.size() + 1));
+      attrs.back().ensureNotReference();
+    }
+
+    // varchar_attr
+    if (GetParam() && base_value % 10 == 5) {
+      attrs.emplace_back(VarCharType::InstanceNullable(32).makeNullValue());
+    } else {
+      std::ostringstream char_buffer;
+      char_buffer << "Here are some numbers: " << base_value;
+      std::string string_literal(char_buffer.str());
+      attrs.emplace_back(VarCharType::InstanceNonNullable(32).makeValue(
+          string_literal.c_str(),
+          string_literal.size() + 1));
+      attrs.back().ensureNotReference();
+    }
+
+    // base_value
+    attrs.emplace_back(base_value / kNumTuplesPerPartition);
+    return new Tuple(std::move(attrs));
+}
+  }
+  
   // Handle initialization.
   void initializeHandle(const Type &argument_type,
                         const std::vector<const Type*> &partition_key_types) {
@@ -86,11 +216,13 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
   }
 
   template <typename CppType>
-  static void CheckAvgValue(
-      CppType expected,
-      const AggregationHandle &handle,
-      const AggregationState &state) {
-    EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
+  static void CheckAvgValues(
+      std::vector<CppType> expected,
+      const ColumnVector* actual) {
+    EXPECT_EQ(expected.size(), actual->size());
+    for (std::size_t i = 0; i < expected.size(); ++i) {
+      EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral<CppType>());
+    }
   }
 
   // Static templated method for set a meaningful value to data types.
@@ -237,6 +369,8 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
   std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
   std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
   std::unique_ptr<StorageManager> storage_manager_;
+  std::unique_ptr<CatalogRelation> relation_;
+  std::vector<block_id> block_ids_;
 };
 
 const int AggregationHandleAvgTest::kNumSamples;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index ce21ade..06d47d2 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1656,6 +1656,11 @@ void ExecutionGenerator::convertWindowAggregate(
       findRelationInfoOutputByPhysical(physical_plan->input());
   window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
 
+  // Get relation blocks.
+  for (block_id bid : input_relation_info->relation->getBlocksSnapshot()) {
+    window_aggr_state_proto->add_block_ids(bid);
+  }
+
   // Get window aggregate function expression.
   const E::AliasPtr &named_window_aggregate_expression =
       physical_plan->window_aggregate_expression();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index 11348fe..d28213c 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -2630,8 +2630,8 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   // TODO(Shixuan): We might want to create a new abstract class Function to
   // include both AggregateFunction and WindowAggregateFunction, which will make
   // this part of code cleaner.
-  const ::quickstep::AggregateFunction *aggregate;
-  const ::quickstep::WindowAggregateFunction *window_aggregate;
+  const ::quickstep::AggregateFunction *aggregate = nullptr;
+  const ::quickstep::WindowAggregateFunction *window_aggregate = nullptr;
   if (parse_function_call.isWindow()) {
     window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
   } else {
@@ -2670,7 +2670,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
       || (window_aggregate != nullptr && window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
     if ((resolved_arguments.empty()) && !count_star) {
       THROW_SQL_ERROR_AT(&parse_function_call)
-          << "COUNT function requires an argument (either scalar or star (*))";
+          << "COUNT aggregate requires an argument (either scalar or star (*))";
     }
   }
 
@@ -2684,7 +2684,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
       || (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
     THROW_SQL_ERROR_AT(&parse_function_call)
-        << "Function " << aggregate->getName()
+        << "Aggregate function " << aggregate->getName()
         << " can not apply to the given argument(s).";
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index ec2f27c..4c2e2b5 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -81,8 +81,7 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
 
 
 void WindowAggregationWorkOrder::execute() {
-  state_->windowAggregateBlocks(output_destination_,
-                                block_ids_);
+  state_->windowAggregateBlocks(output_destination_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 8d05b79..9029bd4 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -47,6 +47,7 @@ namespace quickstep {
 
 WindowAggregationOperationState::WindowAggregationOperationState(
     const CatalogRelationSchema &input_relation,
+    std::vector<block_id> &&block_ids,
     const WindowAggregateFunction *window_aggregate_function,
     std::vector<std::unique_ptr<const Scalar>> &&arguments,
     std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -55,6 +56,7 @@ WindowAggregationOperationState::WindowAggregationOperationState(
     const std::int64_t num_following,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
+      block_ids_(std::move(block_ids)),
       arguments_(std::move(arguments)),
       is_row_(is_row),
       num_preceding_(num_preceding),
@@ -82,7 +84,9 @@ WindowAggregationOperationState::WindowAggregationOperationState(
 
   // Create the handle and initial state.
   window_aggregation_handle_.reset(
-      window_aggregate_function->createHandle(std::move(argument_types),
+      window_aggregate_function->createHandle(input_relation_,
+                                              block_ids_,
+                                              std::move(argument_types),
                                               std::move(partition_by_types)));
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -109,6 +113,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
     StorageManager *storage_manager) {
   DCHECK(ProtoIsValid(proto, database));
 
+  std::vector<block_id> block_ids;
+  for (int block_idx = 0; block_idx < proto.block_ids_size(); ++block_idx) {
+    block_ids.push_back(proto.block_ids(block_idx));
+  }
+
   // Rebuild contructor arguments from their representation in 'proto'.
   const WindowAggregateFunction *window_aggregate_function
       = &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function());
@@ -135,6 +144,7 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
   const std::int64_t num_following = proto.num_following();
 
   return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+                                             std::move(block_ids),
                                              window_aggregate_function,
                                              std::move(arguments),
                                              std::move(partition_by_attributes),
@@ -185,17 +195,18 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
 }
 
 void WindowAggregationOperationState::windowAggregateBlocks(
-    InsertDestination *output_destination,
-    const std::vector<block_id> &block_ids) {
+    InsertDestination *output_destination) {
   window_aggregation_handle_->calculate(arguments_,
-                                        block_ids,
                                         partition_by_ids_,
-                                        input_relation_,
                                         is_row_,
                                         num_preceding_,
                                         num_following_,
-                                        storage_manager_,
-                                        output_destination);
+                                        storage_manager_);
+  std::vector<ValueAccessor*> output_accessors(
+      window_aggregation_handle_->finalize(storage_manager_));
+  for (ValueAccessor* output_accessor : output_accessors) {
+    output_destination->bulkInsertTuples(output_accessor);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index f38dbd8..8116410 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -67,6 +67,7 @@ class WindowAggregationOperationState {
    *        tables.
    */
   WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
+                                  std::vector<block_id> &&block_ids,
                                   const WindowAggregateFunction *window_aggregate_function,
                                   std::vector<std::unique_ptr<const Scalar>> &&arguments,
                                   std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -112,11 +113,11 @@ class WindowAggregationOperationState {
    * @param output_destination The output destination for the computed window
    *                           aggregate.
    **/
-  void windowAggregateBlocks(InsertDestination *output_destination,
-                             const std::vector<block_id> &block_ids);
+  void windowAggregateBlocks(InsertDestination *output_destination);
 
  private:
   const CatalogRelationSchema &input_relation_;
+  const std::vector<block_id> block_ids_;
 
   // TODO(Shixuan): Handle and State for window aggregation will be needed for
   //                actual calculation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index 4dc0a6a..c3b672c 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -24,6 +24,7 @@ import "expressions/Expressions.proto";
 
 message WindowAggregationOperationState {
   required int32 input_relation_id = 1;
+  repeated fixed64 block_ids = 2;
   required WindowAggregateFunction function = 3;
   repeated Scalar arguments = 4;
   repeated Scalar partition_by_attributes = 5;


[2/3] incubator-quickstep git commit: Added handle for avg

Posted by sh...@apache.org.
Added handle for avg


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/29664050
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/29664050
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/29664050

Branch: refs/heads/SQL-window-aggregation
Commit: 2966405066b223d0079f8df3e231ed56750259ad
Parents: 7671a58
Author: shixuan-fan <sh...@apache.org>
Authored: Fri Jul 8 18:23:47 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Fri Jul 8 18:23:47 2016 +0000

----------------------------------------------------------------------
 expressions/CMakeLists.txt                      |   1 +
 expressions/window_aggregation/CMakeLists.txt   |  85 +++
 .../WindowAggregateFunction.cpp                 |  46 ++
 .../WindowAggregateFunction.hpp                 | 149 ++++++
 .../WindowAggregateFunction.proto               |  25 +
 .../WindowAggregateFunctionAvg.cpp              |  85 +++
 .../WindowAggregateFunctionAvg.hpp              |  75 +++
 .../WindowAggregateFunctionFactory.cpp          |  78 +++
 .../WindowAggregateFunctionFactory.hpp          | 102 ++++
 .../WindowAggregationHandle.hpp                 | 130 +++++
 .../WindowAggregationHandleAvg.cpp              | 305 +++++++++++
 .../WindowAggregationHandleAvg.hpp              | 113 ++++
 .../window_aggregation/WindowAggregationID.hpp  |  44 ++
 .../WindowAggregationHandleAvg_unittest.cpp     | 526 +++++++++++++++++++
 query_optimizer/CMakeLists.txt                  |   2 +
 query_optimizer/ExecutionGenerator.cpp          |   5 +-
 query_optimizer/expressions/CMakeLists.txt      |   2 +-
 .../expressions/WindowAggregateFunction.cpp     |   4 +-
 .../expressions/WindowAggregateFunction.hpp     |  10 +-
 query_optimizer/resolver/CMakeLists.txt         |   2 +
 query_optimizer/resolver/Resolver.cpp           |  40 +-
 query_optimizer/resolver/Resolver.hpp           |   5 +-
 relational_operators/CMakeLists.txt             |   1 +
 .../WindowAggregationOperator.cpp               |  10 +-
 .../WindowAggregationOperator.hpp               |  29 +-
 relational_operators/WorkOrder.proto            |   3 +-
 storage/CMakeLists.txt                          |  11 +-
 storage/WindowAggregationOperationState.cpp     |  54 +-
 storage/WindowAggregationOperationState.hpp     |  53 +-
 storage/WindowAggregationOperationState.proto   |  16 +-
 ...WindowAggregationOperationState_unittest.cpp |  14 +-
 31 files changed, 1897 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/CMakeLists.txt b/expressions/CMakeLists.txt
index 53ad5d4..6ef3c24 100644
--- a/expressions/CMakeLists.txt
+++ b/expressions/CMakeLists.txt
@@ -17,6 +17,7 @@ add_subdirectory(aggregation)
 add_subdirectory(predicate)
 add_subdirectory(scalar)
 add_subdirectory(table_generator)
+add_subdirectory(window_aggregation)
 
 QS_PROTOBUF_GENERATE_CPP(expressions_Expressions_proto_srcs
                          expressions_Expressions_proto_hdrs

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt
new file mode 100644
index 0000000..fa4eae9
--- /dev/null
+++ b/expressions/window_aggregation/CMakeLists.txt
@@ -0,0 +1,85 @@
+#   Copyright 2011-2015 Quickstep Technologies LLC.
+#   Copyright 2015 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#   University of Wisconsin\u2014Madison.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+QS_PROTOBUF_GENERATE_CPP(expressions_windowaggregation_WindowAggregateFunction_proto_srcs
+                         expressions_windowaggregation_WindowAggregateFunction_proto_hdrs
+                         WindowAggregateFunction.proto)
+
+# Declare micro-libs:
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction
+            WindowAggregateFunction.cpp
+            WindowAggregateFunction.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+            ${expressions_windowaggregation_WindowAggregateFunction_proto_srcs})
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+            WindowAggregateFunctionAvg.cpp
+            WindowAggregateFunctionAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+            WindowAggregateFunctionFactory.cpp
+            WindowAggregateFunctionFactory.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle
+            ../../empty_src.cpp
+            WindowAggregationHandle.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+            WindowAggregationHandleAvg.cpp
+            WindowAggregationHandleAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationID
+            ../../empty_src.cpp
+            WindowAggregationID.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      glog
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_HashTableBase
+                      quickstep_types_TypedValue
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+                      ${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_types_Type
+                      quickstep_types_TypeFactory
+                      quickstep_types_TypeID
+                      quickstep_types_operations_binaryoperations_BinaryOperation
+                      quickstep_types_operations_binaryoperations_BinaryOperationFactory
+                      quickstep_types_operations_binaryoperations_BinaryOperationID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_types_Type
+                      quickstep_types_TypeFactory
+                      quickstep_types_TypeID
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_operations_binaryoperations_BinaryOperation
+                      quickstep_types_operations_binaryoperations_BinaryOperationFactory
+                      quickstep_types_operations_binaryoperations_BinaryOperationID
+                      quickstep_utility_Macros)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.cpp b/expressions/window_aggregation/WindowAggregateFunction.cpp
new file mode 100644
index 0000000..88ba0b9
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.cpp
@@ -0,0 +1,46 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+serialization::WindowAggregateFunction WindowAggregateFunction::getProto() const {
+  serialization::WindowAggregateFunction proto;
+  switch (win_agg_id_) {
+    case WindowAggregationID::kAvg:
+      proto.set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
+      break;
+    default: {
+      LOG(FATAL) << "Unrecognized WindowAggregationID: "
+                 << static_cast<std::underlying_type<WindowAggregationID>::type>(win_agg_id_);
+    }
+  }
+
+  return proto;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
new file mode 100644
index 0000000..9cc5d74
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -0,0 +1,149 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregationHandle;
+class Type;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief A class representing a particular window aggregate function in the
+ *        abstract sense. Each named aggregate function is represented by a
+ *        singleton subclass of WindowAggregateFunction.
+ *
+ * WindowAggregateFunction provides informational methods about the
+ * applicability of a particular window aggregate function to particular
+ * argument Type(s). The actual implementation of the window aggregate
+ * functions' logic is in the WindowAggregationHandle class hierarchy, and can
+ * be different depending on the particular argument Type(s) given to the window
+ * aggregate. To perform a window aggregation, a caller should first call
+ * WindowAggregateFunction::createHandle() to instantiate an
+ * WindowAggregationHandle object, then use the methods of
+ * WindowAggregationHandle to do the actual window aggregation. Finally, delete
+ * the WindowAggregationHandle after finished.
+ * 
+ * See WindowAggregationHandle for more detailed information about how
+ * window aggregates are actually computed.
+ **/
+class WindowAggregateFunction {
+ public:
+  /**
+   * @brief Get the ID of this window aggregate (i.e. its unique ID amongst all
+   *        the WindowAggregateFunctions).
+   *
+   * @return The WindowAggregationID of this WindowAggregateFunction.
+   **/
+  inline WindowAggregationID getWindowAggregationID() const {
+    return win_agg_id_;
+  }
+
+  /**
+   * @brief Get the human-readable name of this WindowAggregateFunction.
+   *
+   * @return The human-readable name of this WindowAggregateFunction.
+   **/
+  virtual std::string getName() const = 0;
+
+  /**
+   * @brief Get the serialized protocol buffer representation of this
+   *        WindowAggregateFunction.
+   *
+   * @return A serialized protocol buffer representation of this
+   *         WindowAggregateFunction.
+   **/
+  virtual serialization::WindowAggregateFunction getProto() const;
+
+  /**
+   * @brief Determine if this WindowAggregateFunction can be applied to
+   *        arguments of particular Type(s).
+   *
+   * @param argument_types A list of zero or more Types (in order) for
+   *        arguments to this WindowAggregateFunction.
+   * @return Whether this WindowAggregateFunction is applicable to the given
+   *         argument_types.
+   **/
+  virtual bool canApplyToTypes(
+      const std::vector<const Type*> &argument_types) const = 0;
+
+  /**
+   * @brief Determine the result Type for this WindowAggregateFunction given
+   *        arguments of particular Type(s).
+   *
+   * @param argument_types A list of zero or more Types (in order) for
+   *        arguments to this WindowAggregateFunction.
+   * @return The result Type for this WindowAggregateFunction applied to the
+   *         specified argument_types, or nullptr if this
+   *         WindowAggregateFunction is not applicable to the specified Type(s).
+   **/
+  virtual const Type* resultTypeForArgumentTypes(
+      const std::vector<const Type*> &argument_types) const = 0;
+
+  /**
+   * @brief Create a WindowAggregationHandle to compute aggregates.
+   *
+   * @warning It is an error to call this method for argument_types which this
+   *          WindowAggregateFunction can not apply to. For safety, check
+   *          canApplyToTypes() first.
+   *
+   * @param argument_types A list of zero or more Types (in order) for
+   *        arguments to this WindowAggregateFunction.
+   * @param partition_by_attributes The partition keys.
+   * @param is_row Ture if the frame mode is ROWS, false if it is RANGE.
+   * @param num_preceding The number of rows/range that precedes the current row.
+   * @param num_following The number of rows/range that follows the current row.
+   * 
+   * @return A new WindowAggregationHandle that can be used to compute this
+   *         WindowAggregateFunction over the specified argument_types. Caller
+   *         is responsible for deleting the returned object.
+   **/
+  virtual WindowAggregationHandle* createHandle(
+      std::vector<const Type*> &&argument_types,
+      std::vector<const Type*> &&partition_key_types) const = 0;
+
+ protected:
+  explicit WindowAggregateFunction(const WindowAggregationID win_agg_id)
+      : win_agg_id_(win_agg_id) {
+  }
+
+ private:
+  const WindowAggregationID win_agg_id_;
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunction);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunction.proto
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.proto b/expressions/window_aggregation/WindowAggregateFunction.proto
new file mode 100644
index 0000000..173ff0e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.proto
@@ -0,0 +1,25 @@
+//   Copyright 2015 Pivotal Software, Inc.
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+message WindowAggregateFunction {
+  enum WindowAggregationID {
+    AVG = 0;
+  }
+
+  required WindowAggregationID window_aggregation_id = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
new file mode 100644
index 0000000..e9a4453
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -0,0 +1,85 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionAvg::canApplyToTypes(
+    const std::vector<const Type*> &argument_types) const {
+  // AVG is unary.
+  if (argument_types.size() != 1) {
+    return false;
+  }
+
+  // Argument must be addable and divisible.
+  return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+             .canApplyToTypes(*argument_types.front(), *argument_types.front())
+         && BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+             .canApplyToTypes(*argument_types.front(), TypeFactory::GetType(kDouble));
+}
+
+const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
+    const std::vector<const Type*> &argument_types) const {
+  if (!canApplyToTypes(argument_types)) {
+    return nullptr;
+  }
+
+  // The type used to sum values is nullable, and we automatically widen int to
+  // long and float to double to have more headroom when adding up many values.
+  const Type *sum_type = &(argument_types.front()->getNullableVersion());
+  switch (sum_type->getTypeID()) {
+    case kInt:
+      sum_type = &TypeFactory::GetType(kLong, true);
+      break;
+    case kFloat:
+      sum_type = &TypeFactory::GetType(kDouble, true);
+      break;
+    default:
+      break;
+  }
+
+  return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+             .resultTypeForArgumentTypes(*sum_type, TypeFactory::GetType(kDouble));
+}
+
+WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
+    std::vector<const Type*> &&argument_types,
+    std::vector<const Type*> &&partition_key_types) const {
+  DCHECK(canApplyToTypes(argument_types))
+      << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
+      << " that AVG can not be applied to.";
+
+  return new WindowAggregationHandleAvg(*argument_types.front(),
+                                        std::move(partition_key_types));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
new file mode 100644
index 0000000..18e1022
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -0,0 +1,75 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregationHandle;
+class Type;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL AVG() OVER term.
+ **/
+class WindowAggregateFunctionAvg : public WindowAggregateFunction {
+ public:
+  static const WindowAggregateFunctionAvg& Instance() {
+    static WindowAggregateFunctionAvg instance;
+    return instance;
+  }
+
+  std::string getName() const override {
+    return "AVG";
+  }
+
+  bool canApplyToTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  const Type* resultTypeForArgumentTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  WindowAggregationHandle* createHandle(
+      std::vector<const Type*> &&argument_types,
+      std::vector<const Type*> &&partition_key_types) const override;
+
+ private:
+  WindowAggregateFunctionAvg()
+      : WindowAggregateFunction(WindowAggregationID::kAvg) {
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionAvg);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
new file mode 100644
index 0000000..afd53ef
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
@@ -0,0 +1,78 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+
+#include <string>
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::Get(
+    const WindowAggregationID agg_id) {
+  switch (agg_id) {
+    case WindowAggregationID::kAvg:
+      return WindowAggregateFunctionAvg::Instance();
+    default: {
+      LOG(FATAL) << "Unrecognized WindowAggregationID: "
+                 << static_cast<std::underlying_type<WindowAggregationID>::type>(agg_id);
+    }
+  }
+}
+
+const WindowAggregateFunction* WindowAggregateFunctionFactory::GetByName(
+    const std::string &name) {
+  if (name == "avg") {
+    return &WindowAggregateFunctionAvg::Instance();
+  } else {
+    return nullptr;
+  }
+}
+
+bool WindowAggregateFunctionFactory::ProtoIsValid(
+    const serialization::WindowAggregateFunction &proto) {
+  return proto.IsInitialized()
+         && serialization::WindowAggregateFunction::WindowAggregationID_IsValid(proto.window_aggregation_id());
+}
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::ReconstructFromProto(
+    const serialization::WindowAggregateFunction &proto) {
+  DCHECK(ProtoIsValid(proto))
+      << "Attempted to reconstruct an WindowAggregateFunction from an invalid proto:\n"
+      << proto.DebugString();
+
+  switch (proto.window_aggregation_id()) {
+    case serialization::WindowAggregateFunction::AVG:
+      return WindowAggregateFunctionAvg::Instance();
+    default: {
+      LOG(FATAL) << "Unrecognized serialization::WindowAggregateFunction::WindowAggregationID: "
+                 << proto.window_aggregation_id()
+                 << "\nFull proto debug string:\n"
+                 << proto.DebugString();
+    }
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
new file mode 100644
index 0000000..2254482
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
@@ -0,0 +1,102 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+
+#include <string>
+
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregateFunction;
+namespace serialization { class WindowAggregateFunction; }
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief All-static factory class that provides access to the various concrete
+ *        implementations of WindowAggregateFunction.
+ *
+ * WindowAggregateFunctionFactory allows client code to use any
+ * WindowAggregateFunction in Quickstep in a generic way without having to know
+ * about all the specific subclasses of WindowAggregateFunction. In particular,
+ * it is used to deserialize WindowAggregateFunctions used in
+ * WindowAggregationOperationState from their protobuf representations
+ * (originally created by the optimizer) when deserializing a QueryContext.
+ **/
+class WindowAggregateFunctionFactory {
+ public:
+  /**
+   * @brief Get a particular WindowAggregateFunction by its ID.
+   *
+   * @param agg_id The ID of the desired WindowAggregateFunction.
+   * @return A reference to the singleton instance of the
+   *         WindowAggregateFunction specified by agg_id.
+   **/
+  static const WindowAggregateFunction& Get(const WindowAggregationID agg_id);
+
+  /**
+   * @brief Get a particular WindowAggregateFunction by its name in SQL syntax.
+   *
+   * @param name The name of the desired WindowAggregateFunction in lower case.
+   * @return A pointer to the WindowAggregateFunction specified by name, or NULL
+   *         if name does not match any known WindowAggregateFunction.
+   **/
+  static const WindowAggregateFunction* GetByName(const std::string &name);
+
+  /**
+   * @brief Determine if a serialized protobuf representation of a
+   *        WindowAggregateFunction is fully-formed and valid.
+   *
+   * @param proto A serialized protobuf representation of a
+   *              WindowAggregateFunction to check for validity.
+   * @return Whether proto is fully-formed and valid.
+   **/
+  static bool ProtoIsValid(const serialization::WindowAggregateFunction &proto);
+
+  /**
+   * @brief Get the WindowAggregateFunction represented by a proto.
+   *
+   * @warning It is an error to call this method with an invalid proto.
+   *          ProtoIsValid() should be called first to check.
+   *
+   * @param proto A serialized protobuf representation of a
+   *              WindowAggregateFunction.
+   * @return The WindowAggregateFunction represented by proto.
+   **/
+  static const WindowAggregateFunction& ReconstructFromProto(
+      const serialization::WindowAggregateFunction &proto);
+
+ private:
+  // Class is all-static and can not be instantiated.
+  WindowAggregateFunctionFactory();
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionFactory);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
new file mode 100644
index 0000000..77b1e76
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -0,0 +1,130 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "types/TypedValue.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class ColumnVector;
+class InsertDestinationInterface;
+class Scalar;
+class StorageManager;
+class Type;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief WindowAggregationHandle encapsulates logic for actually computing
+ *        window aggregates with particular argument(s).
+ * @note See also WindowAggregateFunction, which represents a SQL aggregate
+ *       function in the abstract sense.
+ *
+ * A WindowAggregationHandle is created by calling
+ * WindowAggregateFunction::createHandle(). The WindowAggregationHandle object
+ * provides methods that are used to actually compute the window aggregate,
+ * storing intermediate results in WindowAggregationState objects.
+ *
+ * The work flow for computing a window aggregate is:
+ *     1. Create an initial state by createInitialState().
+ *     2. One thread will handle all the computation, iterating from the first
+ *        tuple to the last tuple. Note there will be two modes that could be
+ *        used upon different situations:
+ *        a. If the window aggregate is defined as accumulative, which are:
+ *           i.  Functions applied to whole partition, such as rank(), ntile()
+ *               and dense_rank().
+ *           ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING
+ *               AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED
+ *               FOLLOWING".
+ *           Then, for functions except median, we could store some global
+ *           values in the state without keeping all the tuple values around.
+ *        b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND
+ *           3 FOLLOWING", we have to store all the tuples in the state so that
+ *           we could know which values should be dropped as the window slides.
+ *        For each computed value, generate a tuple store in the column vector.
+ *     3. Insert the new column into the original relation and return.
+ *
+ * TODO(Shixuan): Currently we don't support parallelization. The basic idea for
+ * parallelization is to calculate the partial result inside each block. Each
+ * block could visit the following blocks as long as the block's last partition
+ * is not finished. WindowAggregationOperationState will be used for handling
+ * the global state of the calculation.
+ **/
+ 
+class WindowAggregationHandle {
+ public:
+  /**
+   * @brief Destructor.
+   **/
+  virtual ~WindowAggregationHandle() {}
+
+  /**
+   * @brief Calculate the window aggregate result.
+   *
+   * @param state The start state of the calculation.
+   * @param attribute_accessor A pointer to the value accessor of attributes.
+   * @param argument_ids The attribute_id of arguments in attribute_accessor,
+   *                     NULL if not all arguments are attributes.
+   * @param argument_accessor A pointer to the value accessor of arguments,
+   *                          NULL if all arguments are attributes.
+   * @param output_destination The destination for output.
+   **/
+  virtual void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+                         const std::vector<block_id> &block_ids,
+                         const std::vector<attribute_id> &partition_by_ids,
+                         const CatalogRelationSchema &relation,
+                         const bool is_row,
+                         const std::int64_t num_preceding,
+                         const std::int64_t num_following,
+                         StorageManager *storage_manager,
+                         InsertDestinationInterface *output_destination) const = 0;
+
+ protected:
+  /**
+   * @brief Constructor.
+   *
+   * @param partition_by_ids The attribute_id of partition keys in
+   *                         attribute_accessor.
+   * @param is_row True if the frame mode is ROWS, false if it is RANGE.
+   * @param num_preceding The number of rows/range that precedes the current row.
+   * @param num_following The number of rows/range that follows the current row.
+   * @param storage_manager A pointer to the storage manager.
+   **/
+  WindowAggregationHandle() {}
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
new file mode 100644
index 0000000..62f5a88
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -0,0 +1,305 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "storage/InsertDestinationInterface.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonFactory.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+
+WindowAggregationHandleAvg::WindowAggregationHandleAvg(
+    const Type &type,
+    std::vector<const Type*> &&partition_key_types)
+    : argument_type_(type) {
+  // We sum Int as Long and Float as Double so that we have more headroom when
+  // adding many values.
+  TypeID type_id;
+  switch (type.getTypeID()) {
+    case kInt:
+    case kLong:
+      type_id = kLong;
+      break;
+    case kFloat:
+    case kDouble:
+      type_id = kDouble;
+      break;
+    default:
+      type_id = type.getTypeID();
+      break;
+  }
+
+  sum_type_ = &(TypeFactory::GetType(type_id));
+
+  // Result is nullable, because AVG() over 0 values (or all NULL values) is
+  // NULL.
+  result_type_
+      = &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+              .resultTypeForArgumentTypes(*sum_type_, TypeFactory::GetType(kDouble))
+                  ->getNullableVersion());
+
+  // Make operators to do arithmetic:
+  // Add operator for summing argument values.
+  fast_add_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+          .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+  // Divide operator for dividing sum by count to get final average.
+  divide_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+          .makeUncheckedBinaryOperatorForTypes(*sum_type_, TypeFactory::GetType(kDouble)));
+  // Comparison operators for checking if two tuples belong to the same partition.
+  for (const Type *partition_key_type : partition_key_types) {
+    equal_comparators_.emplace_back(
+        ComparisonFactory::GetComparison(ComparisonID::kEqual)
+            .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
+  }
+}
+
+void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+                                           const std::vector<block_id> &block_ids,
+                                           const std::vector<attribute_id> &partition_by_ids,
+                                           const CatalogRelationSchema &relation,
+                                           const bool is_row,
+                                           const std::int64_t num_preceding,
+                                           const std::int64_t num_following,
+                                           StorageManager *storage_manager,
+                                           InsertDestinationInterface *output_destination) const {
+  DCHECK(arguments.size() == 1);
+  DCHECK(!block_ids.empty());
+  
+  // Initialize the tuple accessors and argument accessors.
+  // Index of each value accessor indicates the block it belongs to.
+  std::vector<ValueAccessor*> tuple_accessors;
+  std::vector<ColumnVectorsValueAccessor*> argument_accessors;
+  for (block_id bid : block_ids) {
+    // Get tuple accessor.
+    BlockReference block = storage_manager->getBlock(bid, relation);
+    const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+    ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
+    tuple_accessors.push_back(tuple_accessor);
+
+    // Get argument accessor.
+    ColumnVectorsValueAccessor argument_accessor;
+    SubBlocksReference sub_block_ref(tuple_block,
+                                     block->getIndices(),
+                                     block->getIndicesConsistent());
+    argument_accessor.addColumn(
+        arguments.front()->getAllValues(tuple_accessor, &sub_block_ref));
+    argument_accessors.push_back(&argument_accessor);
+  }
+
+  // Create a window for each tuple and calculate the window aggregate.
+  for (std::uint32_t current_block_index = 0;
+       current_block_index < block_ids.size();
+       ++current_block_index) {
+    ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
+    ColumnVectorsValueAccessor* argument_accessor =
+        argument_accessors[current_block_index];
+    
+    InvokeOnAnyValueAccessor (
+        tuple_accessor,
+        [&] (auto *tuple_accessor) -> void {
+      tuple_accessor->beginIteration();
+      argument_accessor->beginIteration();
+      
+      while (tuple_accessor->next() && argument_accessor->next()) {
+        TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
+                                                               argument_accessors,
+                                                               partition_by_ids,
+                                                               current_block_index,
+                                                               is_row,
+                                                               num_preceding,
+                                                               num_following);
+        Tuple *current_tuple = tuple_accessor->getTuple();
+        std::vector<TypedValue> new_tuple;
+        for (TypedValue value : *current_tuple) {
+          new_tuple.push_back(value);
+        }
+
+        new_tuple.push_back(window_aggregate);
+        output_destination->insertTupleInBatch(Tuple(std::move(new_tuple)));
+      }
+    });
+  }
+}
+
+TypedValue WindowAggregationHandleAvg::calculateOneWindow(
+    std::vector<ValueAccessor*> &tuple_accessors,
+    std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
+    const std::vector<attribute_id> &partition_by_ids,
+    const std::uint32_t current_block_index,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) const {
+  // Initialize.
+  ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
+  ColumnVectorsValueAccessor *argument_accessor = argument_accessors[current_block_index];
+  TypedValue sum = sum_type_->makeZeroValue();
+  sum = fast_add_operator_->
+      applyToTypedValues(sum, argument_accessor->getTypedValue(0));
+  std::uint64_t count = 1;
+  
+  // Get the partition key for the current row.
+  std::vector<TypedValue> current_row_partition_key;
+  for (attribute_id partition_by_id : partition_by_ids) {
+    current_row_partition_key.push_back(
+        tuple_accessor->getTypedValueVirtual(partition_by_id));
+  }
+
+  // Get current position.
+  tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
+  
+  // Find preceding tuples.
+  int count_preceding = 0;
+  tuple_id preceding_tuple_id = current_tuple_id;
+  block_id preceding_block_index = current_block_index;
+  while (num_preceding == -1 || count_preceding < num_preceding) {
+    preceding_tuple_id--;
+
+    // If the preceding tuple locates in the previous block, move to the
+    // previous block and continue searching.
+    // TODO(Shixuan): If it is possible to have empty blocks, "if" has to be
+    // changed to "while".
+    if (preceding_tuple_id < 0) {
+      // First tuple of the first block, no more preceding blocks.
+      preceding_block_index--;
+      if (preceding_block_index < 0) {
+        break;
+      }
+
+      tuple_accessor = tuple_accessors[preceding_block_index];
+      argument_accessor = argument_accessors[preceding_block_index];
+      preceding_tuple_id = argument_accessor->getNumTuples() - 1;
+    }
+
+    // Get the partition keys and compare. If not the same partition as the
+    // current row, end searching preceding tuples.
+    if (!samePartition(current_row_partition_key,
+                       tuple_accessor,
+                       preceding_tuple_id,
+                       partition_by_ids)) {
+      break;
+    }
+
+
+    // Actually count the element and do the calculation.
+    count_preceding++;
+    sum = fast_add_operator_->applyToTypedValues(
+              sum,
+              argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id));
+  }
+
+  count += count_preceding;
+
+  // Find following tuples.
+  int count_following = 0;
+  tuple_id following_tuple_id = current_tuple_id;
+  block_id following_block_index = current_block_index;
+  while (num_following == -1 || count_following < num_following) {
+    following_tuple_id++;
+
+    // If the following tuple locates in the next block, move to the next block
+    // and continue searching.
+    // TODO(Shixuan): If it is possible to have empty blocks, "if" has to be
+    // changed to "while".
+    if (following_tuple_id >= argument_accessor->getNumTuples()) {
+      following_block_index++;
+      // Last tuple of the last block, no more following blocks.
+      if (following_block_index == tuple_accessors.size()) {
+        break;
+      }
+
+      tuple_accessor = tuple_accessors[following_block_index];
+      argument_accessor = argument_accessors[following_block_index];
+      following_tuple_id = 0;
+    }
+
+    // Get the partition keys and compare. If not the same partition as the
+    // current row, end searching preceding tuples.
+    if (!samePartition(current_row_partition_key,
+                       tuple_accessor,
+                       following_tuple_id,
+                       partition_by_ids)) {
+      break;
+    }
+
+
+    // Actually count the element and do the calculation.
+    count_following++;
+    sum = fast_add_operator_->applyToTypedValues(
+              sum,
+              argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id));
+  }
+
+  count += count_following;
+
+
+  return divide_operator_->applyToTypedValues(sum,
+                                              TypedValue(static_cast<double>(count)));
+}
+
+bool WindowAggregationHandleAvg::samePartition(
+    const std::vector<TypedValue> &current_row_partition_key,
+    ValueAccessor *tuple_accessor,
+    const tuple_id boundary_tuple_id,
+    const std::vector<attribute_id> &partition_by_ids) const {
+  return InvokeOnAnyValueAccessor (tuple_accessor,
+                                   [&] (auto *tuple_accessor) -> bool {
+    for (std::uint32_t partition_by_index = 0;
+         partition_by_index < partition_by_ids.size();
+         ++partition_by_index) {
+      if (!equal_comparators_[partition_by_index]->compareTypedValues(
+              current_row_partition_key[partition_by_index],
+              tuple_accessor->getTypedValueAtAbsolutePosition(
+                  partition_by_ids[partition_by_index], boundary_tuple_id))) {
+        return false;
+      }
+    }
+
+    return true;
+  });
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
new file mode 100644
index 0000000..115152e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -0,0 +1,113 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <queue>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVector;
+class ColumnVectorsValueAccessor;
+class InsertDestinationInterface;
+class StorageManager;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief A WindowAggregationHandle for average.
+ **/
+class WindowAggregationHandleAvg : public WindowAggregationHandle {
+ public:
+  ~WindowAggregationHandleAvg() override {}
+
+  void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+                 const std::vector<block_id> &block_ids,
+                 const std::vector<attribute_id> &partition_by_ids,
+                 const CatalogRelationSchema &relation,
+                 const bool is_row,
+                 const std::int64_t num_preceding,
+                 const std::int64_t num_following,
+                 StorageManager *storage_manager,
+                 InsertDestinationInterface *output_destination) const;
+
+ private:
+  friend class WindowAggregateFunctionAvg;
+
+  /**
+   * @brief Constructor.
+   *
+   * @param partition_by_ids The attribute_id of partition keys in
+   *                         attribute_accessor.
+   * @param is_row True if the frame mode is ROWS, false if it is RANGE.
+   * @param num_preceding The number of rows/range that precedes the current row.
+   * @param num_following The number of rows/range that follows the current row.
+   * @param storage_manager A pointer to the storage manager.
+   * @param type Type of the avg value.
+   **/
+  explicit WindowAggregationHandleAvg(const Type &type,
+                                      std::vector<const Type*> &&partition_key_types);
+
+  TypedValue calculateOneWindow(
+      std::vector<ValueAccessor*> &tuple_accessors,
+      std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
+      const std::vector<attribute_id> &partition_by_ids,
+      const std::uint32_t current_block_index,
+      const bool is_row,
+      const std::int64_t num_preceding,
+      const std::int64_t num_following) const;
+
+  bool samePartition(const std::vector<TypedValue> &current_row_partition_key,
+                     ValueAccessor *tuple_accessor,
+                     const tuple_id boundary_tuple_id,
+                     const std::vector<attribute_id> &partition_by_ids) const;
+
+  const Type &argument_type_;
+  const Type *sum_type_;
+  const Type *result_type_;
+  std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+  std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
+  std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationID.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationID.hpp b/expressions/window_aggregation/WindowAggregationID.hpp
new file mode 100644
index 0000000..74c948f
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationID.hpp
@@ -0,0 +1,44 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+
+namespace quickstep {
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief The possible types of window aggregations.
+ **/
+enum class WindowAggregationID {
+  kAvg,
+  kCount,
+  kMin,
+  kMax,
+  kSum
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
new file mode 100644
index 0000000..6a7d161
--- /dev/null
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -0,0 +1,526 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/StorageManager.hpp"
+#include "types/CharType.hpp"
+#include "types/DateOperatorOverloads.hpp"
+#include "types/DatetimeIntervalType.hpp"
+#include "types/DoubleType.hpp"
+#include "types/FloatType.hpp"
+#include "types/IntType.hpp"
+#include "types/IntervalLit.hpp"
+#include "types/LongType.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/VarCharType.hpp"
+#include "types/YearMonthIntervalType.hpp"
+#include "types/containers/ColumnVector.hpp"
+
+#include "gtest/gtest.h"
+
+namespace quickstep {
+
+namespace {
+
+  constexpr int kNumSamples = 100;
+
+}  // namespace
+
+class WindowAggregationHandleAvgTest : public::testing::Test {
+ protected:
+  // Handle initialization.
+  void initializeHandle(const Type &argument_type,
+                        const std::vector<const Type*> &partition_key_types) {
+    WindowAggregateFunction *function =
+        WindowAggregateFactory::Get(WindowAggregationID::kAvg);
+    handle_avg_.reset(function->createHandle(std::vector<const Type*>(1, &argument_type),
+                                             partition_key_types));
+  }
+
+  // Test canApplyToTypes().
+  static bool CanApplyToTypesTest(TypeID typeID) {
+    const Type &type = (typeID == kChar || typeID == kVarChar) ?
+        TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
+        TypeFactory::GetType(typeID);
+
+    return WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).canApplyToTypes(
+        std::vector<const Type*>(1, &type));
+  }
+
+  // Test resultTypeForArgumentTypes().
+  static bool ResultTypeForArgumentTypesTest(TypeID input_type_id,
+                                            TypeID output_type_id) {
+    const Type *result_type
+        = WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).resultTypeForArgumentTypes(
+            std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+    return (result_type->getTypeID() == output_type_id);
+  }
+
+  template <typename CppType>
+  static void CheckAvgValue(
+      CppType expected,
+      const AggregationHandle &handle,
+      const AggregationState &state) {
+    EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
+  }
+
+  // Static templated method for set a meaningful value to data types.
+  template <typename CppType>
+  static void SetDataType(int value, CppType *data) {
+    *data = value;
+  }
+
+  template <typename GenericType, typename OutputType = DoubleType>
+  void checkAggregationAvgGeneric() {
+    const GenericType &type = GenericType::Instance(true);
+    initializeHandle(type);
+    EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+
+    typename GenericType::cpptype val;
+    typename GenericType::cpptype sum;
+    SetDataType(0, &sum);
+
+    iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
+    for (int i = 0; i < kNumSamples; ++i) {
+      if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+        SetDataType(i - 10, &val);
+      } else {
+        SetDataType(static_cast<float>(i - 10)/10, &val);
+      }
+      iterateHandle(aggregation_handle_avg_state_.get(), type.makeValue(&val));
+      sum += val;
+    }
+    iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
+    CheckAvgValue<typename OutputType::cpptype>(static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+                                                *aggregation_handle_avg_,
+                                                *aggregation_handle_avg_state_);
+
+    // Test mergeStates().
+    std::unique_ptr<AggregationState> merge_state(
+        aggregation_handle_avg_->createInitialState());
+    aggregation_handle_avg_->mergeStates(*merge_state,
+                                         aggregation_handle_avg_state_.get());
+
+    iterateHandle(merge_state.get(), type.makeNullValue());
+    for (int i = 0; i < kNumSamples; ++i) {
+      if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+        SetDataType(i - 10, &val);
+      } else {
+        SetDataType(static_cast<float>(i - 10)/10, &val);
+      }
+      iterateHandle(merge_state.get(), type.makeValue(&val));
+      sum += val;
+    }
+
+    aggregation_handle_avg_->mergeStates(*merge_state,
+                                         aggregation_handle_avg_state_.get());
+    CheckAvgValue<typename OutputType::cpptype>(
+        static_cast<typename OutputType::cpptype>(sum) / (2 * kNumSamples),
+        *aggregation_handle_avg_,
+        *aggregation_handle_avg_state_);
+  }
+
+  template <typename GenericType>
+  ColumnVector *createColumnVectorGeneric(const Type &type, typename GenericType::cpptype *sum) {
+    NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
+
+    typename GenericType::cpptype val;
+    SetDataType(0, sum);
+
+    column->appendTypedValue(type.makeNullValue());
+    for (int i = 0; i < kNumSamples; ++i) {
+      if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+        SetDataType(i - 10, &val);
+      } else {
+        SetDataType(static_cast<float>(i - 10)/10, &val);
+      }
+      column->appendTypedValue(type.makeValue(&val));
+      *sum += val;
+      // One NULL in the middle.
+      if (i == kNumSamples/2) {
+        column->appendTypedValue(type.makeNullValue());
+      }
+    }
+    column->appendTypedValue(type.makeNullValue());
+
+    return column;
+  }
+
+  template <typename GenericType, typename OutputType = DoubleType>
+  void checkAggregationAvgGenericColumnVector() {
+    const GenericType &type = GenericType::Instance(true);
+    initializeHandle(type);
+    EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+
+    typename GenericType::cpptype sum;
+    SetDataType(0, &sum);
+    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
+    column_vectors.emplace_back(createColumnVectorGeneric<GenericType>(type, &sum));
+
+    std::unique_ptr<AggregationState> cv_state(
+        aggregation_handle_avg_->accumulateColumnVectors(column_vectors));
+
+    // Test the state generated directly by accumulateColumnVectors(), and also
+    // test after merging back.
+    CheckAvgValue<typename OutputType::cpptype>(
+        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+        *aggregation_handle_avg_,
+        *cv_state);
+
+    aggregation_handle_avg_->mergeStates(*cv_state, aggregation_handle_avg_state_.get());
+    CheckAvgValue<typename OutputType::cpptype>(
+        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+        *aggregation_handle_avg_,
+        *aggregation_handle_avg_state_);
+  }
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+  template <typename GenericType, typename OutputType = DoubleType>
+  void checkAggregationAvgGenericValueAccessor() {
+    const GenericType &type = GenericType::Instance(true);
+    initializeHandle(type);
+    EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+
+    typename GenericType::cpptype sum;
+    SetDataType(0, &sum);
+    std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor());
+    accessor->addColumn(createColumnVectorGeneric<GenericType>(type, &sum));
+
+    std::unique_ptr<AggregationState> va_state(
+        aggregation_handle_avg_->accumulateValueAccessor(accessor.get(),
+                                                         std::vector<attribute_id>(1, 0)));
+
+    // Test the state generated directly by accumulateValueAccessor(), and also
+    // test after merging back.
+    CheckAvgValue<typename OutputType::cpptype>(
+        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+        *aggregation_handle_avg_,
+        *va_state);
+
+    aggregation_handle_avg_->mergeStates(*va_state, aggregation_handle_avg_state_.get());
+    CheckAvgValue<typename OutputType::cpptype>(
+        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+        *aggregation_handle_avg_,
+        *aggregation_handle_avg_state_);
+  }
+#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+
+  std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
+  std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
+  std::unique_ptr<StorageManager> storage_manager_;
+};
+
+const int AggregationHandleAvgTest::kNumSamples;
+
+template <>
+void AggregationHandleAvgTest::CheckAvgValue<double>(
+    double expected,
+    const AggregationHandle &handle,
+    const AggregationState &state) {
+  EXPECT_DOUBLE_EQ(expected, handle.finalize(state).getLiteral<double>());
+}
+
+template <>
+void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) {
+  data->interval_ticks = value;
+}
+
+template <>
+void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) {
+  data->months = value;
+}
+
+typedef AggregationHandleAvgTest AggregationHandleAvgDeathTest;
+
+TEST_F(AggregationHandleAvgTest, IntTypeTest) {
+  checkAggregationAvgGeneric<IntType>();
+}
+
+TEST_F(AggregationHandleAvgTest, LongTypeTest) {
+  checkAggregationAvgGeneric<LongType>();
+}
+
+TEST_F(AggregationHandleAvgTest, FloatTypeTest) {
+  checkAggregationAvgGeneric<FloatType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DoubleTypeTest) {
+  checkAggregationAvgGeneric<DoubleType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeTest) {
+  checkAggregationAvgGeneric<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeTest) {
+  checkAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, IntTypeColumnVectorTest) {
+  checkAggregationAvgGenericColumnVector<IntType>();
+}
+
+TEST_F(AggregationHandleAvgTest, LongTypeColumnVectorTest) {
+  checkAggregationAvgGenericColumnVector<LongType>();
+}
+
+TEST_F(AggregationHandleAvgTest, FloatTypeColumnVectorTest) {
+  checkAggregationAvgGenericColumnVector<FloatType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) {
+  checkAggregationAvgGenericColumnVector<DoubleType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) {
+  checkAggregationAvgGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) {
+  checkAggregationAvgGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+TEST_F(AggregationHandleAvgTest, IntTypeValueAccessorTest) {
+  checkAggregationAvgGenericValueAccessor<IntType>();
+}
+
+TEST_F(AggregationHandleAvgTest, LongTypeValueAccessorTest) {
+  checkAggregationAvgGenericValueAccessor<LongType>();
+}
+
+TEST_F(AggregationHandleAvgTest, FloatTypeValueAccessorTest) {
+  checkAggregationAvgGenericValueAccessor<FloatType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DoubleTypeValueAccessorTest) {
+  checkAggregationAvgGenericValueAccessor<DoubleType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeValueAccessorTest) {
+  checkAggregationAvgGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) {
+  checkAggregationAvgGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>();
+}
+#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+
+#ifdef QUICKSTEP_DEBUG
+TEST_F(AggregationHandleAvgDeathTest, CharTypeTest) {
+  const Type &type = CharType::Instance(true, 10);
+  EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(AggregationHandleAvgDeathTest, VarTypeTest) {
+  const Type &type = VarCharType::Instance(true, 10);
+  EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(AggregationHandleAvgDeathTest, WrongTypeTest) {
+  const Type &int_non_null_type = IntType::Instance(false);
+  const Type &long_type = LongType::Instance(true);
+  const Type &double_type = DoubleType::Instance(true);
+  const Type &float_type = FloatType::Instance(true);
+  const Type &char_type = CharType::Instance(true, 10);
+  const Type &varchar_type = VarCharType::Instance(true, 10);
+
+  initializeHandle(IntType::Instance(true));
+  int int_val = 0;
+  std::int64_t long_val = 0;
+  double double_val = 0;
+  float float_val = 0;
+
+  iterateHandle(aggregation_handle_avg_state_.get(), int_non_null_type.makeValue(&int_val));
+
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), long_type.makeValue(&long_val)), "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), double_type.makeValue(&double_val)), "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), float_type.makeValue(&float_val)), "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), char_type.makeValue("asdf", 5)), "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), varchar_type.makeValue("asdf", 5)), "");
+
+  // Test mergeStates() with incorrectly typed handles.
+  std::unique_ptr<AggregationHandle> aggregation_handle_avg_double(
+      AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
+          std::vector<const Type*>(1, &double_type)));
+  std::unique_ptr<AggregationState> aggregation_state_avg_merge_double(
+      aggregation_handle_avg_double->createInitialState());
+  static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_double).iterateUnaryInl(
+      static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_double.get()),
+      double_type.makeValue(&double_val));
+  EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double,
+                                                    aggregation_handle_avg_state_.get()),
+               "");
+
+  std::unique_ptr<AggregationHandle> aggregation_handle_avg_float(
+      AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
+          std::vector<const Type*>(1, &float_type)));
+  std::unique_ptr<AggregationState> aggregation_state_avg_merge_float(
+      aggregation_handle_avg_float->createInitialState());
+  static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_float).iterateUnaryInl(
+      static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_float.get()),
+      float_type.makeValue(&float_val));
+  EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float,
+                                                    aggregation_handle_avg_state_.get()),
+               "");
+}
+#endif
+
+TEST_F(AggregationHandleAvgTest, canApplyToTypeTest) {
+  EXPECT_TRUE(ApplyToTypesTest(kInt));
+  EXPECT_TRUE(ApplyToTypesTest(kLong));
+  EXPECT_TRUE(ApplyToTypesTest(kFloat));
+  EXPECT_TRUE(ApplyToTypesTest(kDouble));
+  EXPECT_FALSE(ApplyToTypesTest(kChar));
+  EXPECT_FALSE(ApplyToTypesTest(kVarChar));
+  EXPECT_FALSE(ApplyToTypesTest(kDatetime));
+  EXPECT_TRUE(ApplyToTypesTest(kDatetimeInterval));
+  EXPECT_TRUE(ApplyToTypesTest(kYearMonthInterval));
+}
+
+TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) {
+  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kInt, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
+  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
+}
+
+TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
+  const Type &long_non_null_type = LongType::Instance(false);
+  initializeHandle(long_non_null_type);
+  storage_manager_.reset(new StorageManager("./test_avg_data"));
+  std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+      aggregation_handle_avg_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &long_non_null_type),
+          10,
+          storage_manager_.get()));
+  std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+      aggregation_handle_avg_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &long_non_null_type),
+          10,
+          storage_manager_.get()));
+
+  AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+          destination_hash_table.get());
+
+  AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+          source_hash_table.get());
+
+  AggregationHandleAvg *aggregation_handle_avg_derived =
+      static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
+  // We create three keys: first is present in both the hash tables, second key
+  // is present only in the source hash table while the third key is present
+  // the destination hash table only.
+  std::vector<TypedValue> common_key;
+  common_key.emplace_back(static_cast<std::int64_t>(0));
+  std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+  exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
+  exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
+
+  const std::int64_t common_key_source_avg = 355;
+  TypedValue common_key_source_avg_val(common_key_source_avg);
+
+  const std::int64_t common_key_destination_avg = 295;
+  TypedValue common_key_destination_avg_val(common_key_destination_avg);
+
+  const std::int64_t exclusive_key_source_avg = 1;
+  TypedValue exclusive_key_source_avg_val(exclusive_key_source_avg);
+
+  const std::int64_t exclusive_key_destination_avg = 1;
+  TypedValue exclusive_key_destination_avg_val(exclusive_key_destination_avg);
+
+  std::unique_ptr<AggregationStateAvg> common_key_source_state(
+      static_cast<AggregationStateAvg *>(
+          aggregation_handle_avg_->createInitialState()));
+  std::unique_ptr<AggregationStateAvg> common_key_destination_state(
+      static_cast<AggregationStateAvg *>(
+          aggregation_handle_avg_->createInitialState()));
+  std::unique_ptr<AggregationStateAvg> exclusive_key_source_state(
+      static_cast<AggregationStateAvg *>(
+          aggregation_handle_avg_->createInitialState()));
+  std::unique_ptr<AggregationStateAvg> exclusive_key_destination_state(
+      static_cast<AggregationStateAvg *>(
+          aggregation_handle_avg_->createInitialState()));
+
+  // Create avg value states for keys.
+  aggregation_handle_avg_derived->iterateUnaryInl(common_key_source_state.get(),
+                                                  common_key_source_avg_val);
+
+  aggregation_handle_avg_derived->iterateUnaryInl(
+      common_key_destination_state.get(), common_key_destination_avg_val);
+
+  aggregation_handle_avg_derived->iterateUnaryInl(
+      exclusive_key_destination_state.get(), exclusive_key_destination_avg_val);
+
+  aggregation_handle_avg_derived->iterateUnaryInl(
+      exclusive_key_source_state.get(), exclusive_key_source_avg_val);
+
+  // Add the key-state pairs to the hash tables.
+  source_hash_table_derived->putCompositeKey(common_key,
+                                             *common_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      common_key, *common_key_destination_state);
+  source_hash_table_derived->putCompositeKey(exclusive_source_key,
+                                             *exclusive_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      exclusive_destination_key, *exclusive_key_destination_state);
+
+  EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+  EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+  aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table,
+                                                  destination_hash_table.get());
+
+  EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+  CheckAvgValue<double>(
+      (common_key_destination_avg_val.getLiteral<std::int64_t>() +
+          common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2),
+      *aggregation_handle_avg_derived,
+      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+  CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
+                  *aggregation_handle_avg_derived,
+                  *(destination_hash_table_derived->getSingleCompositeKey(
+                      exclusive_destination_key)));
+  CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
+                  *aggregation_handle_avg_derived,
+                  *(source_hash_table_derived->getSingleCompositeKey(
+                      exclusive_source_key)));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7e53b9d..a56b714 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -69,6 +69,8 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryoptimizer_ExecutionHeuristics

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 43d63f9..ce21ade 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -48,6 +48,8 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
 #include "query_optimizer/ExecutionHeuristics.hpp"
@@ -1652,7 +1654,7 @@ void ExecutionGenerator::convertWindowAggregate(
   // Get input.
   const CatalogRelationInfo *input_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->input());
-  window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+  window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
 
   // Get window aggregate function expression.
   const E::AliasPtr &named_window_aggregate_expression =
@@ -1713,6 +1715,7 @@ void ExecutionGenerator::convertWindowAggregate(
   const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
       execution_plan_->addRelationalOperator(
           new WindowAggregationOperator(query_handle_->query_id(),
+                                        *input_relation_info->relation,
                                         *output_relation,
                                         window_aggr_state_index,
                                         insert_destination_index));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 08d7df5..d12644a 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -304,7 +304,7 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       glog
-                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_Expression

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/expressions/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp
index 7b1f304..be5db59 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.cpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.cpp
@@ -22,7 +22,7 @@
 #include <utility>
 #include <vector>
 
-#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/expressions/PatternMatcher.hpp"
@@ -59,7 +59,7 @@ const Type& WindowAggregateFunction::getValueType() const {
 }
 
 WindowAggregateFunctionPtr WindowAggregateFunction::Create(
-    const ::quickstep::AggregateFunction &window_aggregate,
+    const ::quickstep::WindowAggregateFunction &window_aggregate,
     const std::vector<ScalarPtr> &arguments,
     const WindowInfo &window_info,
     const std::string &window_name,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/expressions/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp
index 0bee28f..279dfa1 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.hpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.hpp
@@ -33,7 +33,7 @@
 
 namespace quickstep {
 
-class AggregateFunction;
+class WindowAggregateFunction;
 class Type;
 
 namespace optimizer {
@@ -140,7 +140,7 @@ class WindowAggregateFunction : public Expression {
    * @return The WindowAggregateFunction singleton (from the expression system)
    *         for this node.
    **/
-  inline const ::quickstep::AggregateFunction& window_aggregate() const {
+  inline const ::quickstep::WindowAggregateFunction& window_aggregate() const {
     return window_aggregate_;
   }
 
@@ -185,7 +185,7 @@ class WindowAggregateFunction : public Expression {
    * @param is_distinct Whether this is a DISTINCT aggregation.
    * @return A new AggregateFunctionPtr.
    **/
-  static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate,
+  static WindowAggregateFunctionPtr Create(const ::quickstep::WindowAggregateFunction &window_aggregate,
                                            const std::vector<ScalarPtr> &arguments,
                                            const WindowInfo &window_info,
                                            const std::string &window_name,
@@ -209,7 +209,7 @@ class WindowAggregateFunction : public Expression {
    * @param window_info The window info of the window aggregate function.
    * @param is_distinct Indicates whether this is a DISTINCT aggregation.
    */
-  WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate,
+  WindowAggregateFunction(const ::quickstep::WindowAggregateFunction &window_aggregate,
                           const std::vector<ScalarPtr> &arguments,
                           const WindowInfo &window_info,
                           const std::string &window_name,
@@ -228,7 +228,7 @@ class WindowAggregateFunction : public Expression {
   // window_aggregate_. If it really needs to be seperated from the
   // AggregationFunction, a new class for WindowAggregationFunction should be
   // created as quickstep::WindowAggregateFunction.
-  const ::quickstep::AggregateFunction &window_aggregate_;
+  const ::quickstep::WindowAggregateFunction &window_aggregate_;
   std::vector<ScalarPtr> arguments_;
   const WindowInfo window_info_;
   const std::string window_name_;