You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/08 15:26:18 UTC

[1/6] incubator-quickstep git commit: Remove unused vector_based HashJoin collector type [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/SQL-window-aggregation 3901f8a2d -> 9a5ecd288 (forced update)


Remove unused vector_based HashJoin collector type


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/33470032
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/33470032
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/33470032

Branch: refs/heads/SQL-window-aggregation
Commit: 33470032a946a5a216df931b56be0eb8c6bfa0c4
Parents: 5c4e8db
Author: Navneet Potti <na...@gmail.com>
Authored: Mon Jun 27 11:00:04 2016 -0500
Committer: Navneet Potti <na...@cs.wisc.edu>
Committed: Wed Jun 29 12:54:09 2016 -0500

----------------------------------------------------------------------
 relational_operators/HashJoinOperator.cpp | 116 +------------------------
 relational_operators/HashJoinOperator.hpp |   3 -
 2 files changed, 2 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/33470032/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 5a47b50..667df1e 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -61,25 +61,9 @@ namespace quickstep {
 
 namespace {
 
-DEFINE_bool(vector_based_joined_tuple_collector, false,
-            "If true, use simple vector-based joined tuple collector in "
-            "hash join, with a final sort pass to group joined tuple pairs "
-            "by inner block. If false, use unordered_map based collector that "
-            "keeps joined pairs grouped by inner block as they are found "
-            "(this latter option has exhibited performance/scaling problems, "
-            "particularly in NUMA contexts).");
-
 // Functor passed to HashTable::getAllFromValueAccessor() to collect matching
-// tuples from the inner relation. This version stores matching tuple ID pairs
+// tuples from the inner relation. It stores matching tuple ID pairs
 // in an unordered_map keyed by inner block ID.
-//
-// NOTE(chasseur): Performance testing has shown that this particular
-// implementation has problems scaling in a multisocket NUMA machine.
-// Additional benchmarking revealed problems using the STL unordered_map class
-// in a NUMA system (at least for the implementation in GNU libstdc++), even
-// though instances of this class and the internal unordered_map are private to
-// a single thread. Because of this, VectorBasedJoinedTupleCollector is used by
-// default instead.
 class MapBasedJoinedTupleCollector {
  public:
   MapBasedJoinedTupleCollector() {
@@ -91,13 +75,6 @@ class MapBasedJoinedTupleCollector {
     joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition());
   }
 
-  // Consolidation is a no-op for this version, but we provide this trivial
-  // call so that MapBasedJoinedTupleCollector and
-  // VectorBasedJoinedTupleCollector have the same interface and can both be
-  // used in the templated HashInnerJoinWorkOrder::executeWithCollectorType() method.
-  inline void consolidate() const {
-  }
-
   // Get a mutable pointer to the collected map of joined tuple ID pairs. The
   // key is inner block_id, values are vectors of joined tuple ID pairs with
   // tuple ID from the inner block on the left and the outer block on the
@@ -116,82 +93,6 @@ class MapBasedJoinedTupleCollector {
   std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_;
 };
 
-// Compare std::pair instances based on their first element only.
-template <typename PairT>
-inline bool CompareFirst(const PairT &left, const PairT &right) {
-  return left.first < right.first;
-}
-
-// Functor passed to HashTable::getAllFromValueAccessor() to collect matching
-// tuples from the inner relation. This version stores inner block ID and pairs
-// of joined tuple IDs in an unsorted vector, which should then be sorted with
-// a call to consolidate() before materializing join output.
-//
-// NOTE(chasseur): Because of NUMA scaling issues for
-// MapBasedJoinedTupleCollector noted above, this implementation is the
-// default.
-class VectorBasedJoinedTupleCollector {
- public:
-  VectorBasedJoinedTupleCollector() {
-  }
-
-  template <typename ValueAccessorT>
-  inline void operator()(const ValueAccessorT &accessor,
-                         const TupleReference &tref) {
-    joined_tuples_.emplace_back(tref.block,
-                                std::make_pair(tref.tuple, accessor.getCurrentPosition()));
-  }
-
-  // Sorts joined tuple pairs by inner block ID. Must be called before
-  // getJoinedTuples().
-  void consolidate() {
-    if (joined_tuples_.empty()) {
-      return;
-    }
-
-    // Sort joined tuple_id pairs by inner block_id.
-    std::sort(joined_tuples_.begin(),
-              joined_tuples_.end(),
-              CompareFirst<std::pair<block_id, std::pair<tuple_id, tuple_id>>>);
-
-    // Make a single vector of joined block_id pairs for each inner block for
-    // compatibility with other join-related APIs.
-    consolidated_joined_tuples_.emplace_back(joined_tuples_.front().first,
-                                             std::vector<std::pair<tuple_id, tuple_id>>());
-
-    for (const std::pair<block_id, std::pair<tuple_id, tuple_id>> &match_entry
-         : joined_tuples_) {
-      if (match_entry.first == consolidated_joined_tuples_.back().first) {
-        consolidated_joined_tuples_.back().second.emplace_back(match_entry.second);
-      } else {
-        consolidated_joined_tuples_.emplace_back(
-            match_entry.first,
-            std::vector<std::pair<tuple_id, tuple_id>>(1, match_entry.second));
-      }
-    }
-  }
-
-  // Get a mutable pointer to the collected joined tuple ID pairs. The returned
-  // vector has a single entry for each inner block where there are matching
-  // joined tuples (the inner block's ID is the first element of the pair). The
-  // second element of each pair is another vector consisting of pairs of
-  // joined tuple IDs (tuple ID from inner block on the left, from outer block
-  // on the right).
-  inline std::vector<std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>>*
-      getJoinedTuples() {
-    return &consolidated_joined_tuples_;
-  }
-
- private:
-  // Unsorted vector of join matches that is appended to by call operator().
-  std::vector<std::pair<block_id, std::pair<tuple_id, tuple_id>>> joined_tuples_;
-
-  // Joined tuples sorted by inner block_id. consolidate() populates this from
-  // 'joined_tuples_'.
-  std::vector<std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>>
-      consolidated_joined_tuples_;
-};
-
 class SemiAntiJoinTupleCollector {
  public:
   explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
@@ -516,21 +417,12 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const
 
 
 void HashInnerJoinWorkOrder::execute() {
-  if (FLAGS_vector_based_joined_tuple_collector) {
-    executeWithCollectorType<VectorBasedJoinedTupleCollector>();
-  } else {
-    executeWithCollectorType<MapBasedJoinedTupleCollector>();
-  }
-}
-
-template <typename CollectorT>
-void HashInnerJoinWorkOrder::executeWithCollectorType() {
   BlockReference probe_block(
       storage_manager_->getBlock(block_id_, probe_relation_));
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
 
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
-  CollectorT collector;
+  MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
@@ -544,7 +436,6 @@ void HashInnerJoinWorkOrder::executeWithCollectorType() {
         any_join_key_attributes_nullable_,
         &collector);
   }
-  collector.consolidate();
 
   const relation_id build_relation_id = build_relation_.getID();
   const relation_id probe_relation_id = probe_relation_.getID();
@@ -637,8 +528,6 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
 
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
 
-  // TODO(harshad) - Make this function work with both types of collectors.
-
   // We collect all the matching probe relation tuples, as there's a residual
   // preidcate that needs to be applied after collecting these matches.
   MapBasedJoinedTupleCollector collector;
@@ -810,7 +699,6 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
 
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
-  // TODO(harshad) - Make the following code work with both types of collectors.
   MapBasedJoinedTupleCollector collector;
   // We probe the hash table and get all the matches. Unlike
   // executeWithoutResidualPredicate(), we have to collect all the matching

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/33470032/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 9762f04..5d3d7da 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -356,9 +356,6 @@ class HashInnerJoinWorkOrder : public WorkOrder {
   void execute() override;
 
  private:
-  template <typename CollectorT>
-  void executeWithCollectorType();
-
   const CatalogRelationSchema &build_relation_;
   const CatalogRelationSchema &probe_relation_;
   const std::vector<attribute_id> join_key_attributes_;


[6/6] incubator-quickstep git commit: Added ExecutionGenerator support for Window Aggregation.

Posted by zu...@apache.org.
Added ExecutionGenerator support for Window Aggregation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9a5ecd28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9a5ecd28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9a5ecd28

Branch: refs/heads/SQL-window-aggregation
Commit: 9a5ecd2884eb583a406a95fdcef103f863f37faf
Parents: 04c8224
Author: shixuan-fan <sh...@apache.org>
Authored: Wed Jun 29 20:25:18 2016 +0000
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Jul 8 10:25:48 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |   2 +
 query_execution/QueryContext.cpp                |  14 ++
 query_execution/QueryContext.hpp                |  48 +++++
 query_execution/QueryContext.proto              |   5 +-
 query_optimizer/CMakeLists.txt                  |   2 +
 query_optimizer/ExecutionGenerator.cpp          | 101 ++++++++++-
 query_optimizer/ExecutionGenerator.hpp          |   8 +
 query_optimizer/cost_model/CMakeLists.txt       |   2 +
 query_optimizer/cost_model/SimpleCostModel.cpp  |   9 +
 query_optimizer/cost_model/SimpleCostModel.hpp  |   5 +
 .../cost_model/StarSchemaSimpleCostModel.cpp    |   8 +
 .../cost_model/StarSchemaSimpleCostModel.hpp    |   4 +
 query_optimizer/resolver/Resolver.cpp           |   7 +-
 .../tests/execution_generator/Select.test       |  39 ++++
 query_optimizer/tests/resolver/Select.test      |  33 ++++
 relational_operators/CMakeLists.txt             |  14 ++
 .../WindowAggregationOperator.cpp               |  82 +++++++++
 .../WindowAggregationOperator.hpp               | 166 +++++++++++++++++
 relational_operators/WorkOrder.proto            |   9 +
 storage/CMakeLists.txt                          |  45 ++++-
 storage/WindowAggregationOperationState.cpp     | 179 +++++++++++++++++++
 storage/WindowAggregationOperationState.hpp     | 177 ++++++++++++++++++
 storage/WindowAggregationOperationState.proto   |  33 ++++
 ...WindowAggregationOperationState_unittest.cpp |  92 ++++++++++
 24 files changed, 1079 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b031a44..8f039c8 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -118,6 +118,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext
                       quickstep_storage_HashTableFactory
                       quickstep_storage_InsertDestination
                       quickstep_storage_InsertDestination_proto
+                      quickstep_storage_WindowAggregationOperationState
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple
                       quickstep_utility_BloomFilter
@@ -130,6 +131,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto
                       quickstep_storage_AggregationOperationState_proto
                       quickstep_storage_HashTable_proto
                       quickstep_storage_InsertDestination_proto
+                      quickstep_storage_WindowAggregationOperationState_proto
                       quickstep_types_containers_Tuple_proto
                       quickstep_utility_SortConfiguration_proto
                       ${PROTOBUF_LIBRARY})

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 54dd557..7019b6a 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -140,6 +140,13 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
 
     update_groups_.push_back(move(update_group));
   }
+
+  for (int i = 0; i < proto.window_aggregation_states_size(); ++i) {
+    window_aggregation_states_.emplace_back(
+        WindowAggregationOperationState::ReconstructFromProto(proto.window_aggregation_states(i),
+                                                              database,
+                                                              storage_manager));
+  }
 }
 
 bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
@@ -231,6 +238,13 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
     }
   }
 
+  for (int i = 0; i < proto.window_aggregation_states_size(); ++i) {
+    if (!WindowAggregationOperationState::ProtoIsValid(proto.window_aggregation_states(i),
+                                                       database)) {
+      return false;
+    }
+  }
+
   return proto.IsInitialized();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7d5628d..9171250 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -33,6 +33,7 @@
 #include "storage/AggregationOperationState.hpp"
 #include "storage/HashTable.hpp"
 #include "storage/InsertDestination.hpp"
+#include "storage/WindowAggregationOperationState.hpp"
 #include "types/containers/Tuple.hpp"
 #include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
@@ -120,6 +121,11 @@ class QueryContext {
   typedef std::uint32_t update_group_id;
 
   /**
+   * @brief A unique identifier for a window aggregation state.
+   **/
+  typedef std::uint32_t window_aggregation_state_id;
+
+  /**
    * @brief Constructor.
    *
    * @param proto A serialized Protocol Buffer representation of a
@@ -460,6 +466,47 @@ class QueryContext {
     return update_groups_[id];
   }
 
+  /**
+   * @brief Whether the given WindowAggregationOperationState id is valid.
+   *
+   * @param id The WindowAggregationOperationState id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidWindowAggregationStateId(const window_aggregation_state_id id) const {
+    return id < window_aggregation_states_.size();
+  }
+
+  /**
+   * @brief Get the WindowAggregationOperationState.
+   *
+   * @param id The WindowAggregationOperationState id in the query.
+   *
+   * @return The WindowAggregationOperationState, already created in the
+   *         constructor.
+   **/
+  inline WindowAggregationOperationState* getWindowAggregationState(
+      const window_aggregation_state_id id) {
+    DCHECK_LT(id, window_aggregation_states_.size());
+    DCHECK(window_aggregation_states_[id]);
+    return window_aggregation_states_[id].get();
+  }
+
+  /**
+   * @brief Release the given WindowAggregationOperationState.
+   *
+   * @param id The id of the WindowAggregationOperationState to destroy.
+   *
+   * @return The WindowAggregationOperationState, already created in the
+   *         constructor.
+   **/
+  inline WindowAggregationOperationState* releaseWindowAggregationState(
+      const window_aggregation_state_id id) {
+    DCHECK_LT(id, window_aggregation_states_.size());
+    DCHECK(window_aggregation_states_[id]);
+    return window_aggregation_states_[id].release();
+  }
+
  private:
   std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
   std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
@@ -471,6 +518,7 @@ class QueryContext {
   std::vector<std::unique_ptr<const SortConfiguration>> sort_configs_;
   std::vector<std::unique_ptr<Tuple>> tuples_;
   std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> update_groups_;
+  std::vector<std::unique_ptr<WindowAggregationOperationState>> window_aggregation_states_;
 
   DISALLOW_COPY_AND_ASSIGN(QueryContext);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index 98cd0b6..ddf8326 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -22,6 +22,7 @@ import "expressions/table_generator/GeneratorFunction.proto";
 import "storage/AggregationOperationState.proto";
 import "storage/HashTable.proto";
 import "storage/InsertDestination.proto";
+import "storage/WindowAggregationOperationState.proto";
 import "types/containers/Tuple.proto";
 import "utility/BloomFilter.proto";
 import "utility/SortConfiguration.proto";
@@ -55,5 +56,7 @@ message QueryContext {
   // NOTE(zuyu): For UpdateWorkOrder only.
   repeated UpdateGroup update_groups = 10;
 
-  required uint64 query_id = 11;
+  repeated WindowAggregationOperationState window_aggregation_states = 11;
+
+  required uint64 query_id = 12;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 8912414..7e53b9d 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -88,6 +88,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_expressions_Scalar
                       quickstep_queryoptimizer_expressions_ScalarLiteral
+                      quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       quickstep_queryoptimizer_physical_Aggregate
                       quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_physical_CreateIndex
@@ -130,6 +131,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_UpdateOperator
+                      quickstep_relationaloperators_WindowAggregationOperator
                       quickstep_storage_AggregationOperationState_proto
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTable_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 45f5f78..43d63f9 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -63,6 +63,7 @@
 #include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "query_optimizer/expressions/Scalar.hpp"
 #include "query_optimizer/expressions/ScalarLiteral.hpp"
+#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
@@ -104,6 +105,7 @@
 #include "relational_operators/TableGeneratorOperator.hpp"
 #include "relational_operators/TextScanOperator.hpp"
 #include "relational_operators/UpdateOperator.hpp"
+#include "relational_operators/WindowAggregationOperator.hpp"
 #include "storage/AggregationOperationState.pb.h"
 #include "storage/HashTable.pb.h"
 #include "storage/HashTableFactory.hpp"
@@ -284,8 +286,8 @@ void ExecutionGenerator::generatePlanInternal(
       return convertUpdateTable(
           std::static_pointer_cast<const P::UpdateTable>(physical_plan));
     case P::PhysicalType::kWindowAggregate:
-      THROW_SQL_ERROR()
-          << "Window aggregate function is not supported yet :(";
+      return convertWindowAggregate(
+          std::static_pointer_cast<const P::WindowAggregate>(physical_plan));
     default:
       LOG(FATAL) << "Unknown physical plan node "
                  << physical_plan->getShortString();
@@ -1639,5 +1641,100 @@ void ExecutionGenerator::convertTableGenerator(
   temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation);
 }
 
+void ExecutionGenerator::convertWindowAggregate(
+    const P::WindowAggregatePtr &physical_plan) {
+  // Create window_aggregation_operation_state proto.
+  const QueryContext::window_aggregation_state_id window_aggr_state_index =
+      query_context_proto_->window_aggregation_states_size();
+  S::WindowAggregationOperationState *window_aggr_state_proto =
+      query_context_proto_->add_window_aggregation_states();
+
+  // Get input.
+  const CatalogRelationInfo *input_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->input());
+  window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+
+  // Get window aggregate function expression.
+  const E::AliasPtr &named_window_aggregate_expression =
+      physical_plan->window_aggregate_expression();
+  const E::WindowAggregateFunctionPtr &window_aggregate_function =
+      std::static_pointer_cast<const E::WindowAggregateFunction>(
+          named_window_aggregate_expression->expression());
+
+  // Set the AggregateFunction.
+  window_aggr_state_proto->mutable_function()->MergeFrom(
+      window_aggregate_function->window_aggregate().getProto());
+
+  // Set the arguments.
+  for (const E::ScalarPtr &argument : window_aggregate_function->arguments()) {
+    unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+    window_aggr_state_proto->add_arguments()->MergeFrom(concretized_argument->getProto());
+  }
+
+  // Set partition keys.
+  const E::WindowInfo &window_info = window_aggregate_function->window_info();
+  for (const E::ScalarPtr &partition_by_attribute
+      : window_info.partition_by_attributes) {
+    unique_ptr<const Scalar> concretized_partition_by_attribute(
+        partition_by_attribute->concretize(attribute_substitution_map_));
+    window_aggr_state_proto->add_partition_by_attributes()
+        ->MergeFrom(concretized_partition_by_attribute->getProto());
+  }
+
+  // Set window frame info.
+  if (window_info.frame_info == nullptr) {
+    // If the frame is not specified, use the default setting:
+    //   1. If ORDER BY key is specified, use cumulative aggregation:
+    //      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
+    //   2. If ORDER BY key is not specified either, use the whole partition:
+    //      ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
+    window_aggr_state_proto->set_is_row(true);  // frame mode: ROWS.
+    window_aggr_state_proto->set_num_preceding(-1);  // UNBOUNDED PRECEDING.
+    window_aggr_state_proto->set_num_following(
+        window_info.order_by_attributes.empty()
+            ? -1  // UNBOUNDED FOLLOWING.
+            : 0);  // CURRENT ROW.
+  } else {
+    const E::WindowFrameInfo *window_frame_info = window_info.frame_info;
+    window_aggr_state_proto->set_is_row(window_frame_info->is_row);
+    window_aggr_state_proto->set_num_preceding(window_frame_info->num_preceding);
+    window_aggr_state_proto->set_num_following(window_frame_info->num_following);
+  }
+
+  // Create InsertDestination proto.
+  const CatalogRelation *output_relation = nullptr;
+  const QueryContext::insert_destination_id insert_destination_index =
+      query_context_proto_->insert_destinations_size();
+  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  createTemporaryCatalogRelation(physical_plan,
+                                 &output_relation,
+                                 insert_destination_proto);
+
+  const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new WindowAggregationOperator(query_handle_->query_id(),
+                                        *output_relation,
+                                        window_aggr_state_index,
+                                        insert_destination_index));
+
+  // TODO(Shixuan): Once parallelism is introduced, the is_pipeline_breaker
+  //                could be set to false.
+  if (!input_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(window_aggregation_operator_index,
+                                         input_relation_info->producer_operator_index,
+                                         true /* is_pipeline_breaker */);
+  }
+
+  insert_destination_proto->set_relational_op_index(window_aggregation_operator_index);
+
+  // Add to map and temp_relation_info_vec.
+  physical_to_output_relation_map_.emplace(
+      std::piecewise_construct,
+      std::forward_as_tuple(physical_plan),
+      std::forward_as_tuple(window_aggregation_operator_index, output_relation));
+  temporary_relation_info_vec_.emplace_back(window_aggregation_operator_index,
+                                            output_relation);
+}
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index c7fd018..9186707 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -60,6 +60,7 @@
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
 #include "query_optimizer/physical/UpdateTable.hpp"
+#include "query_optimizer/physical/WindowAggregate.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -347,6 +348,13 @@ class ExecutionGenerator {
   void convertTableGenerator(const physical::TableGeneratorPtr &physical_plan);
 
   /**
+   * @brief Converts a physical WindowAggregate to a WindowAggregation operator.
+   *
+   * @param physical_plan The WindowAggregate to be converted.
+   */
+  void convertWindowAggregate(const physical::WindowAggregatePtr &physical_plan);
+
+  /**
    * @brief Converts a list of NamedExpressions in the optimizer expression
    *        system to scalars proto in QueryContext proto.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 6bf5240..5d5b596 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -40,6 +40,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
                       quickstep_queryoptimizer_physical_TableGenerator
                       quickstep_queryoptimizer_physical_TableReference
                       quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
                       glog
@@ -64,6 +65,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_physical_TableGenerator
                       quickstep_queryoptimizer_physical_TableReference
                       quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_utility_Macros)
 
 # Module all-in-one library:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index 48f76fa..e5222ff 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -33,6 +33,7 @@
 #include "query_optimizer/physical/TableGenerator.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "query_optimizer/physical/WindowAggregate.hpp"
 
 #include "glog/logging.h"
 
@@ -72,6 +73,9 @@ std::size_t SimpleCostModel::estimateCardinality(
       return estimateCardinality(
           shared_subplans_[shared_subplan_reference->subplan_id()]);
     }
+    case P::PhysicalType::kWindowAggregate:
+      return estimateCardinalityForWindowAggregate(
+          std::static_pointer_cast<const P::WindowAggregate>(physical_plan));
     default:
       LOG(FATAL) << "Unsupported physical plan:" << physical_plan->toString();
   }
@@ -118,6 +122,11 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
                   estimateCardinality(physical_plan->input()) / 10);
 }
 
+std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(
+    const physical::WindowAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->input());
+}
+
 }  // namespace cost
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 9862198..9837039 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -32,6 +32,7 @@
 #include "query_optimizer/physical/TableGenerator.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "query_optimizer/physical/WindowAggregate.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -88,6 +89,10 @@ class SimpleCostModel : public CostModel {
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  // Return the estimated cardinality of the input plan.
+  std::size_t estimateCardinalityForWindowAggregate(
+      const physical::WindowAggregatePtr &physical_plan);
+
   const std::vector<physical::PhysicalPtr> &shared_subplans_;
 
   DISALLOW_COPY_AND_ASSIGN(SimpleCostModel);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index eb9fcc1..badfeb1 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -85,6 +85,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality(
     case P::PhysicalType::kSort:
       return estimateCardinality(
           std::static_pointer_cast<const P::Sort>(physical_plan)->input());
+    case P::PhysicalType::kWindowAggregate:
+      return estimateCardinalityForWindowAggregate(
+          std::static_pointer_cast<const P::WindowAggregate>(physical_plan));
     default:
       LOG(FATAL) << "Unsupported physical plan:" << physical_plan->toString();
   }
@@ -141,6 +144,11 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate(
                   estimateCardinality(physical_plan->input()) / 10);
 }
 
+std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate(
+    const P::WindowAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->input());
+}
+
 double StarSchemaSimpleCostModel::estimateSelectivity(
     const physical::PhysicalPtr &physical_plan) {
   switch (physical_plan->getPhysicalType()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index c63e55a..83032cf 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -33,6 +33,7 @@
 #include "query_optimizer/physical/TableGenerator.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "query_optimizer/physical/WindowAggregate.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -94,6 +95,9 @@ class StarSchemaSimpleCostModel : public CostModel {
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  std::size_t estimateCardinalityForWindowAggregate(
+      const physical::WindowAggregatePtr &physical_plan);
+
   double estimateSelectivityForSelection(
       const physical::SelectionPtr &physical_plan);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index c07751a..f10378b 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -1860,12 +1860,17 @@ L::LogicalPtr Resolver::resolveJoinedTableReference(
 L::LogicalPtr Resolver::resolveSortInWindow(
     const L::LogicalPtr &logical_plan,
     const E::WindowInfo &window_info) {
-  // Sort the table by (p_key, o_key)
+  // Sort the table by (p_key, o_key).
   std::vector<E::AttributeReferencePtr> sort_attributes(window_info.partition_by_attributes);
   sort_attributes.insert(sort_attributes.end(),
                          window_info.order_by_attributes.begin(),
                          window_info.order_by_attributes.end());
 
+  // If (p_key, o_key) is empty, no sort is needed.
+  if (sort_attributes.empty()) {
+    return logical_plan;
+  }
+
   std::vector<bool> sort_directions(
       window_info.partition_by_attributes.size(), true);
   sort_directions.insert(sort_directions.end(),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 05f7108..16127cc 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -950,3 +950,42 @@ WHERE double_col < 0
 +--------------------+
 |                   5|
 +--------------------+
+==
+
+# Window Aggregation Test.
+# Currently this is not supported, an empty table will be returned.
+SELECT avg(int_col) OVER w FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
++------------------------+
+|avg(int_col)            |
++------------------------+
++------------------------+
+==
+
+SELECT int_col, sum(float_col) OVER
+(PARTITION BY char_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+FROM test;
+--
++-----------+------------------------+
+|int_col    |sum(float_col)          |
++-----------+------------------------+
++-----------+------------------------+
+==
+
+SELECT sum(avg(int_col) OVER w) FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
++------------------------+
+|sum(avg(int_col))       |
++------------------------+
+|                    NULL|
++------------------------+

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/query_optimizer/tests/resolver/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/resolver/Select.test b/query_optimizer/tests/resolver/Select.test
index 89ab84d..5e11ac0 100644
--- a/query_optimizer/tests/resolver/Select.test
+++ b/query_optimizer/tests/resolver/Select.test
@@ -3269,6 +3269,39 @@ TopLevelPlan
     type=Double NULL]
 ==
 
+SELECT avg(int_col) OVER w FROM test
+WINDOW w AS
+(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING);
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| |   relation=$window_aggregate,type=Double NULL]
+| |   +-WindowAggregateFunction[function=AVG,window_name=w,is_ascending=[],
+| |     nulls_first=[],frame_mode=row,num_preceding=-1,num_following=-1]
+| |     +-arguments=
+| |     | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |     +-partition_by=
+| |     | +-[]
+| |     +-order_by=
+| |       +-[]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+|     +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+|       relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+==
+
 SELECT int_col, sum(float_col) OVER w1 FROM test
 WINDOW w2 AS
 (PARTITION BY vchar_col, long_col

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 91d1097..249441d 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -61,6 +61,7 @@ add_library(quickstep_relationaloperators_SortRunGenerationOperator SortRunGener
 add_library(quickstep_relationaloperators_TableGeneratorOperator TableGeneratorOperator.cpp TableGeneratorOperator.hpp)
 add_library(quickstep_relationaloperators_TextScanOperator TextScanOperator.cpp TextScanOperator.hpp)
 add_library(quickstep_relationaloperators_UpdateOperator UpdateOperator.cpp UpdateOperator.hpp)
+add_library(quickstep_relationaloperators_WindowAggregationOperator WindowAggregationOperator.cpp WindowAggregationOperator.hpp)
 add_library(quickstep_relationaloperators_WorkOrder ../empty_src.cpp WorkOrder.hpp)
 add_library(quickstep_relationaloperators_WorkOrderFactory WorkOrderFactory.cpp WorkOrderFactory.hpp)
 add_library(quickstep_relationaloperators_WorkOrder_proto
@@ -423,6 +424,18 @@ target_link_libraries(quickstep_relationaloperators_UpdateOperator
                       quickstep_threading_ThreadIDBasedMap
                       quickstep_utility_Macros
                       tmb)
+target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator
+                      glog
+                      quickstep_catalog_CatalogRelation
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_utility_Macros
+                      tmb)                      
 target_link_libraries(quickstep_relationaloperators_WorkOrder
                       glog
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -487,6 +500,7 @@ target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_UpdateOperator
+                      quickstep_relationaloperators_WindowAggregationOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_WorkOrder_proto)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
new file mode 100644
index 0000000..93cb9d4
--- /dev/null
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -0,0 +1,82 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "relational_operators/WindowAggregationOperator.hpp"
+
+#include <vector>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool WindowAggregationOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  DCHECK(query_context != nullptr);
+
+  if (blocking_dependencies_met_ && !generated_) {
+    container->addNormalWorkOrder(
+        new WindowAggregationWorkOrder(
+            query_id_,
+            query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+            query_context->getInsertDestination(output_destination_index_)),
+        op_index_);
+    generated_ = true;
+  }
+
+  return generated_;
+}
+
+bool WindowAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (blocking_dependencies_met_ && !generated_) {
+    container->addWorkOrderProto(createWorkOrderProto(), op_index_);
+    generated_ = true;
+  }
+
+  return generated_;
+}
+
+serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::WINDOW_AGGREGATION);
+  proto->set_query_id(query_id_);
+  proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
+                      window_aggregation_state_index_);
+  proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
+                      output_destination_index_);
+
+  return proto;
+}
+
+
+void WindowAggregationWorkOrder::execute() {
+  std::cout << "Window aggregation is not supported yet.\n"
+      << "An empty table is returned\n";
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
new file mode 100644
index 0000000..f3dfd14
--- /dev/null
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -0,0 +1,166 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_WINDOW_AGGREGATION_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_WINDOW_AGGREGATION_OPERATOR_HPP_
+
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class StorageManager;
+class WindowAggregationOperationState;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+/**
+ * @brief An operator which performs window aggregation over a relation.
+ **/
+class WindowAggregationOperator : public RelationalOperator {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of this query.
+   * @param input_relation The relation to perform aggregation over.
+   * @param window_aggregation_state_index The index of WindowAggregationState
+   *                                       in QueryContext.
+   * @param output_destination_index The index of InsertDestination in
+   *                                 QueryContext for the output.
+   **/
+  WindowAggregationOperator(const std::size_t query_id,
+                            const CatalogRelation &output_relation,
+                            const QueryContext::window_aggregation_state_id window_aggregation_state_index,
+                            const QueryContext::insert_destination_id output_destination_index)
+      : RelationalOperator(query_id),
+        output_relation_(output_relation),
+        window_aggregation_state_index_(window_aggregation_state_index),
+        output_destination_index_(output_destination_index),
+        generated_(false) {}
+
+  ~WindowAggregationOperator() override {}
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+  const relation_id getOutputRelationID() const override {
+    return output_relation_.getID();
+  }
+
+  QueryContext::insert_destination_id getInsertDestinationID() const override {
+    return output_destination_index_;
+  }
+
+ private:
+  /**
+   * @brief Create Work Order proto.
+   *
+   * @return A window aggregation work order.
+   **/
+  serialization::WorkOrder* createWorkOrderProto();
+
+  const CatalogRelation &output_relation_;
+  const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
+  const QueryContext::insert_destination_id output_destination_index_;
+  bool generated_;
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by WindowAggregationOperator.
+ **/
+class WindowAggregationWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor
+   *
+   * @param query_id The ID of this query.
+   * @param state The WindowAggregationOperatorState to use.
+   * @param output_destination The InsertDestination for output.
+   **/
+  WindowAggregationWorkOrder(const std::size_t query_id,
+                             WindowAggregationOperationState *state,
+                             InsertDestination *output_destination)
+      : WorkOrder(query_id),
+        state_(state),
+        output_destination_(output_destination)  {}
+
+  ~WindowAggregationWorkOrder() override {}
+
+  /**
+   * @brief Get the pointer to WindowAggregationOperationState.
+   * @note This is a quickfix for "unused variable". After the window aggregate
+   *       functions are built, these methods might be dropped.
+   *
+   * @return A pointer to the window aggregation operation state.
+   **/
+  WindowAggregationOperationState* state() {
+    return state_;
+  }
+
+  /**
+   * @brief Get the pointer to output destination.
+   * @note This is a quickfix for "unused variable". After the window aggregate
+   *       functions are built, these methods might be dropped.
+   *
+   * @return A pointer to the output destination.
+   **/
+  InsertDestination* output_destination() {
+    return output_destination_;
+  }
+
+  void execute() override;
+
+ private:
+  WindowAggregationOperationState *state_;
+  InsertDestination *output_destination_;
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregationWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_WINDOW_AGGREGATION_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 3ed065a..69dee1b 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -41,6 +41,7 @@ enum WorkOrderType {
   TABLE_GENERATOR = 17;
   TEXT_SCAN = 18;
   UPDATE = 19;
+  WINDOW_AGGREGATION = 20;
 }
 
 message WorkOrder {
@@ -243,3 +244,11 @@ message UpdateWorkOrder {
     optional fixed64 block_id = 325;
   }
 }
+
+message WindowAggregationWorkOrder {
+  extend WorkOrder {
+    // All required
+    optional uint32 window_aggr_state_index = 336;
+    optional int32 insert_destination_index = 337;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index b536411..9df66e1 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -131,6 +131,9 @@ QS_PROTOBUF_GENERATE_CPP(storage_InsertDestination_proto_srcs
 QS_PROTOBUF_GENERATE_CPP(storage_StorageBlockLayout_proto_srcs
                          storage_StorageBlockLayout_proto_hdrs
                          StorageBlockLayout.proto)
+QS_PROTOBUF_GENERATE_CPP(storage_WindowAggregationOperationState_proto_srcs
+                         storage_WindowAggregationOperationState_proto_hdrs
+                         WindowAggregationOperationState.proto)
 
 if (ENABLE_DISTRIBUTED)
   GRPC_GENERATE_CPP(storage_DataExchange_proto_srcs
@@ -254,6 +257,11 @@ add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp
 add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
 add_library(quickstep_storage_ValueAccessor ../empty_src.cpp ValueAccessor.hpp)
 add_library(quickstep_storage_ValueAccessorUtil ../empty_src.cpp ValueAccessorUtil.hpp)
+add_library(quickstep_storage_WindowAggregationOperationState
+            WindowAggregationOperationState.hpp
+            WindowAggregationOperationState.cpp)
+add_library(quickstep_storage_WindowAggregationOperationState_proto ${storage_WindowAggregationOperationState_proto_srcs})
+
 
 # Link dependencies:
 target_link_libraries(quickstep_storage_AggregationOperationState
@@ -1038,6 +1046,27 @@ target_link_libraries(quickstep_storage_ValueAccessorUtil
                       quickstep_storage_ValueAccessor
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_WindowAggregationOperationState
+                      glog
+                      quickstep_catalog_CatalogDatabaseLite
+                      quickstep_catalog_CatalogRelationSchema
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_ExpressionFactories
+                      quickstep_expressions_Expressions_proto
+                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_aggregation_AggregateFunctionFactory
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_expressions_scalar_Scalar
+                      quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_storage_WindowAggregationOperationState_proto
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
+                      quickstep_expressions_aggregation_AggregateFunction_proto
+                      quickstep_expressions_Expressions_proto
+                      ${PROTOBUF_LIBRARY})
 
 # Module all-in-one library:
 add_library(quickstep_storage ../empty_src.cpp StorageModule.hpp)
@@ -1096,7 +1125,9 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
-                      quickstep_storage_ValueAccessorUtil)
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_storage_WindowAggregationOperationState
+                      quickstep_storage_WindowAggregationOperationState_proto)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   target_link_libraries(quickstep_storage
                         quickstep_storage_FileManagerHdfs)
@@ -1636,6 +1667,18 @@ target_link_libraries(StorageManager_unittest
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
                       quickstep_utility_ShardedLockManager)
+
+add_executable(WindowAggregationOperationState_unittest
+               "${CMAKE_CURRENT_SOURCE_DIR}/tests/WindowAggregationOperationState_unittest.cpp")
+target_link_libraries(WindowAggregationOperationState_unittest
+                      gtest
+                      gtest_main
+                      quickstep_catalog_CatalogDatabase
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_WindowAggregationOperationState
+                      quickstep_storage_WindowAggregationOperationState_proto
+                      ${LIBS})
 if (QUICKSTEP_HAVE_LIBNUMA)
   target_link_libraries(StorageManager_unittest
                         ${LIBNUMA_LIBRARY})

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
new file mode 100644
index 0000000..a0bcc37
--- /dev/null
+++ b/storage/WindowAggregationOperationState.cpp
@@ -0,0 +1,179 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "storage/WindowAggregationOperationState.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabaseLite.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/ExpressionFactories.hpp"
+#include "expressions/Expressions.pb.h"
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregateFunctionFactory.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationOperationState::WindowAggregationOperationState(
+    const CatalogRelationSchema &input_relation,
+    const AggregateFunction *window_aggregate_function,
+    std::vector<std::unique_ptr<const Scalar>> &&arguments,
+    std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following,
+    StorageManager *storage_manager)
+    : input_relation_(input_relation),
+      arguments_(std::move(arguments)),
+      partition_by_attributes_(std::move(partition_by_attributes)),
+      is_row_(is_row),
+      num_preceding_(num_preceding),
+      num_following_(num_following),
+      storage_manager_(storage_manager) {
+  // Get the Types of this window aggregate's arguments so that we can create an
+  // AggregationHandle.
+  // TODO(Shixuan): Next step: New handles for window aggregation function.
+  std::vector<const Type*> argument_types;
+  for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+    argument_types.emplace_back(&argument->getType());
+  }
+
+  // Check if window aggregate function could apply to the arguments.
+  DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
+
+  // Create the handle and initial state.
+  window_aggregation_handle_.reset(
+      window_aggregate_function->createHandle(argument_types));
+  window_aggregation_state_.reset(
+      window_aggregation_handle_->createInitialState());
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+  // See if all of this window aggregate's arguments are attributes in the input
+  // relation. If so, remember the attribute IDs so that we can do copy elision
+  // when actually performing the window aggregation.
+  arguments_as_attributes_.reserve(arguments_.size());
+  for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+    const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
+    if (argument_id == -1) {
+      arguments_as_attributes_.clear();
+      break;
+    } else {
+      DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
+      arguments_as_attributes_.push_back(argument_id);
+    }
+  }
+#endif
+}
+
+WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
+    const serialization::WindowAggregationOperationState &proto,
+    const CatalogDatabaseLite &database,
+    StorageManager *storage_manager) {
+  DCHECK(ProtoIsValid(proto, database));
+
+  // Rebuild contructor arguments from their representation in 'proto'.
+  const AggregateFunction *aggregate_function
+      = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
+
+  std::vector<std::unique_ptr<const Scalar>> arguments;
+  arguments.reserve(proto.arguments_size());
+  for (int argument_idx = 0; argument_idx < proto.arguments_size(); ++argument_idx) {
+    arguments.emplace_back(ScalarFactory::ReconstructFromProto(
+        proto.arguments(argument_idx),
+        database));
+  }
+
+  std::vector<std::unique_ptr<const Scalar>> partition_by_attributes;
+  for (int attribute_idx = 0;
+       attribute_idx < proto.partition_by_attributes_size();
+       ++attribute_idx) {
+    partition_by_attributes.emplace_back(ScalarFactory::ReconstructFromProto(
+        proto.partition_by_attributes(attribute_idx),
+        database));
+  }
+
+  const bool is_row = proto.is_row();
+  const std::int64_t num_preceding = proto.num_preceding();
+  const std::int64_t num_following = proto.num_following();
+
+  return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
+                                             aggregate_function,
+                                             std::move(arguments),
+                                             std::move(partition_by_attributes),
+                                             is_row,
+                                             num_preceding,
+                                             num_following,
+                                             storage_manager);
+}
+
+bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
+                                                   const CatalogDatabaseLite &database) {
+  if (!proto.IsInitialized() ||
+      !database.hasRelationWithId(proto.relation_id())) {
+    return false;
+  }
+
+  if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+    return false;
+  }
+
+  // TODO(chasseur): We may also want to check that the specified
+  // AggregateFunction is applicable to the specified arguments, but that
+  // requires partial deserialization and may be too heavyweight for this
+  // method.
+  // TODO(Shixuan): The TODO for AggregateFunction could also be applied here.
+  for (int argument_idx = 0;
+       argument_idx < proto.arguments_size();
+       ++argument_idx) {
+    if (!ScalarFactory::ProtoIsValid(proto.arguments(argument_idx), database)) {
+      return false;
+    }
+  }
+
+  for (int attribute_idx = 0;
+       attribute_idx < proto.partition_by_attributes_size();
+       ++attribute_idx) {
+    if (!ScalarFactory::ProtoIsValid(proto.partition_by_attributes(attribute_idx),
+                                     database)) {
+      return false;
+    }
+  }
+
+  if (proto.num_preceding() < -1 || proto.num_following() < -1) {
+    return false;
+  }
+
+  return true;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
new file mode 100644
index 0000000..d7b3e6a
--- /dev/null
+++ b/storage/WindowAggregationOperationState.hpp
@@ -0,0 +1,177 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
+#define QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class AggregateFunction;
+class CatalogDatabaseLite;
+class CatalogRelationSchema;
+class InsertDestination;
+class StorageManager;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+/**
+ * @brief Helper class for maintaining the state of window aggregation. 
+ **/
+class WindowAggregationOperationState {
+ public:
+  /**
+   * @brief Constructor for window aggregation operation state.
+   *
+   * @param input_relation Input relation on which window aggregation is computed.
+   * @param window_aggregate_functions The window aggregate function to be
+   *                                   computed.
+   * @param arguments A list of argument expressions to that aggregate.
+   * @param partition_by_attributes A list of window partition key.
+   * @param is_row True if the window frame is calculated by ROW, false if it is
+   *               calculated by RANGE.
+   * @param num_preceding The number of rows/range for the tuples preceding the
+   *                      current row. -1 means UNBOUNDED PRECEDING.
+   * @param num_following The number of rows/range for the tuples following the
+   *                      current row. -1 means UNBOUNDED FOLLOWING.
+   * @param storage_manager The StorageManager to use for allocating hash
+   *        tables.
+   */
+  WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
+                                  const AggregateFunction *window_aggregate_function,
+                                  std::vector<std::unique_ptr<const Scalar>> &&arguments,
+                                  std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+                                  const bool is_row,
+                                  const std::int64_t num_preceding,
+                                  const std::int64_t num_following,
+                                  StorageManager *storage_manager);
+
+  ~WindowAggregationOperationState() {}
+
+  /**
+   * @brief Generate the window aggregation operation state from the serialized
+   *        Protocol Buffer representation.
+   *
+   * @param proto A serialized protocol buffer representation of a
+   *        WindowAggregationOperationState, originally generated by the
+   *        optimizer.
+   * @param database The database for resolving relation and attribute
+   *        references.
+   * @param storage_manager The StorageManager to use.
+   **/
+  static WindowAggregationOperationState* ReconstructFromProto(
+      const serialization::WindowAggregationOperationState &proto,
+      const CatalogDatabaseLite &database,
+      StorageManager *storage_manager);
+
+  /**
+   * @brief Check whether a serialization::AggregationOperationState is
+   *        fully-formed and all parts are valid.
+   *
+   * @param proto A serialized Protocol Buffer representation of an
+   *        AggregationOperationState, originally generated by the optimizer.
+   * @param database The Database to resolve relation and attribute references
+   *        in.
+   * @return Whether proto is fully-formed and valid.
+   **/
+  static bool ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
+                           const CatalogDatabaseLite &database);
+
+  /**
+   * @brief Get the is_row info.
+   * @note This is a quickfix for "unused variable". After the window aggregate
+   *       functions are built, these methods might be dropped.
+   * 
+   * @return True if the frame mode is ROW, false if it is RANGE.
+   **/
+  const bool is_row() const { return is_row_; }
+
+  /**
+   * @brief Get the num_preceding info.
+   * @note This is a quickfix for "unused variable". After the window aggregate
+   *       functions are built, these methods might be dropped.
+   *
+   * @return The number of rows/range that precedes the current row.
+   **/
+  const std::int64_t num_preceding() const { return num_preceding_; }
+
+  /**
+   * @brief Get the num_following info.
+   * @note This is a quickfix for "unused variable". After the window aggregate
+   *       functions are built, these methods might be dropped.
+   *
+   * @return The number of rows/range that follows the current row.
+   **/
+  const std::int64_t num_following() const { return num_following_; }
+
+  /**
+   * @brief Get the pointer to StorageManager.
+   * @note This is a quickfix for "unused variable". After the window aggregate
+   *       functions are built, these methods might be dropped.
+   *
+   * @return A pointer to the storage manager.
+   **/
+  StorageManager *storage_manager() { return storage_manager_; }
+
+ private:
+  const CatalogRelationSchema &input_relation_;
+
+  // TODO(Shixuan): Handle and State for window aggregation will be needed for
+  //                actual calculation.
+  std::unique_ptr<AggregationHandle> window_aggregation_handle_;
+  std::unique_ptr<AggregationState> window_aggregation_state_;
+  std::vector<std::unique_ptr<const Scalar>> arguments_;
+
+  // We don't add order_by_attributes here since it is not needed after sorting.
+  std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
+
+  // Window framing information.
+  const bool is_row_;
+  const std::int64_t num_preceding_;
+  const std::int64_t num_following_;
+
+  StorageManager *storage_manager_;
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+  // If all an aggregate's argument expressions are simply attributes in
+  // 'input_relation_', then this caches the attribute IDs of those arguments.
+  std::vector<attribute_id> arguments_as_attributes_;
+#endif
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
new file mode 100644
index 0000000..c7bd0ef
--- /dev/null
+++ b/storage/WindowAggregationOperationState.proto
@@ -0,0 +1,33 @@
+//   Copyright 2011-2015 Quickstep Technologies LLC.
+//   Copyright 2015-2016 Pivotal Software, Inc.
+//   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+//     University of Wisconsin\u2014Madison.
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/Expressions.proto";
+
+message WindowAggregationOperationState {
+  required int32 relation_id = 1;
+  required AggregateFunction function = 2;
+  repeated Scalar arguments = 3;
+  repeated Scalar partition_by_attributes = 4;
+  required bool is_row = 5;
+  required int64 num_preceding = 6;  // -1 means UNBOUNDED PRECEDING.
+  required int64 num_following = 7;  // -1 means UNBOUNDED FOLLOWING.
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9a5ecd28/storage/tests/WindowAggregationOperationState_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/WindowAggregationOperationState_unittest.cpp b/storage/tests/WindowAggregationOperationState_unittest.cpp
new file mode 100644
index 0000000..c572034
--- /dev/null
+++ b/storage/tests/WindowAggregationOperationState_unittest.cpp
@@ -0,0 +1,92 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <cstddef>
+#include <memory>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "storage/WindowAggregationOperationState.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+
+#include "gtest/gtest.h"
+
+using std::unique_ptr;
+
+namespace quickstep {
+
+namespace {
+  constexpr relation_id kInvalidTableId = 100;
+  constexpr std::int64_t kInvalidNum = -10;
+  constexpr std::int64_t kValidNum = 10;
+}  // namespace
+
+class WindowAggregationOperationStateProtoTest : public ::testing::Test {
+ protected:
+  virtual void SetUp() {
+    database_.reset(new CatalogDatabase(nullptr, "db"));
+    rel_id_ = database_->addRelation(new CatalogRelation(nullptr, "rel"));
+  }
+
+  unique_ptr<CatalogDatabase> database_;
+  relation_id rel_id_;
+};
+
+TEST_F(WindowAggregationOperationStateProtoTest, UninitializationTest) {
+  serialization::WindowAggregationOperationState proto;
+  EXPECT_FALSE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+}
+
+TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
+  serialization::WindowAggregationOperationState proto;
+  proto.set_relation_id(kInvalidTableId);
+  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_is_row(true);
+  proto.set_num_preceding(kValidNum);
+  proto.set_num_following(kValidNum);
+  EXPECT_FALSE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+}
+
+TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
+  serialization::WindowAggregationOperationState proto;
+  proto.set_relation_id(rel_id_);
+  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_is_row(true);
+  proto.set_num_preceding(kInvalidNum);
+  proto.set_num_following(kValidNum);
+  EXPECT_FALSE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+
+  proto.set_num_preceding(kValidNum);
+  proto.set_num_following(kInvalidNum);
+  EXPECT_FALSE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+}
+
+TEST_F(WindowAggregationOperationStateProtoTest, ValidTest) {
+  serialization::WindowAggregationOperationState proto;
+  proto.set_relation_id(rel_id_);
+  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_is_row(true);
+  proto.set_num_preceding(kValidNum);
+  proto.set_num_following(kValidNum);
+  EXPECT_TRUE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+}
+
+}  // namespace quickstep


[5/6] incubator-quickstep git commit: Minor changes in profiling work order output.

Posted by zu...@apache.org.
Minor changes in profiling work order output.

- Now prints query ID along with each work order entry.
- Removed spaces between two columns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/04c8224b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/04c8224b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/04c8224b

Branch: refs/heads/SQL-window-aggregation
Commit: 04c8224b1584b982412c7023d041c1060d5c0342
Parents: 31f1bbb
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jul 6 11:38:49 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jul 6 22:28:28 2016 -0500

----------------------------------------------------------------------
 query_execution/Foreman.cpp | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/04c8224b/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index f9f2e7a..98146e2 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -238,16 +238,17 @@ void Foreman::printWorkOrderProfilingResults(const std::size_t query_id,
   const std::vector<
       std::tuple<std::size_t, std::size_t, std::size_t>>
       &recorded_times = policy_enforcer_->getProfilingResults(query_id);
-  fputs("Worker ID, NUMA Socket, Operator ID, Time (microseconds)\n", out);
+  fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);
   for (auto workorder_entry : recorded_times) {
     // Note: Index of the "worker thread index" in the tuple is 0.
     const std::size_t worker_id = std::get<0>(workorder_entry);
     fprintf(out,
-            "%lu, %d, %lu, %lu\n",
+            "%lu,%lu,%d,%lu,%lu\n",
+            query_id,
             worker_id,
             worker_directory_->getNUMANode(worker_id),
-            std::get<1>(workorder_entry),
-            std::get<2>(workorder_entry));
+            std::get<1>(workorder_entry),  // Operator ID.
+            std::get<2>(workorder_entry));  // Time.
   }
 }
 


[4/6] incubator-quickstep git commit: Disallow negative number of worker threads.

Posted by zu...@apache.org.
Disallow negative number of worker threads.

- Fixed a bug thereby Quickstep command line now disallows negative
  number of worker threads.
- If the user provides zero or fewer worker threads, we switch to the
  default number of worker threasd, instead of terminating the process.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/31f1bbb1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/31f1bbb1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/31f1bbb1

Branch: refs/heads/SQL-window-aggregation
Commit: 31f1bbb1c71d9a18af27ee540c83f513125b656f
Parents: 040a511
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 30 11:04:29 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sun Jul 3 23:20:00 2016 -0500

----------------------------------------------------------------------
 cli/QuickstepCli.cpp | 23 +++++++++++------------
 1 file changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31f1bbb1/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 3f99130..02a55a0 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -200,19 +200,18 @@ int main(int argc, char* argv[]) {
   // that we computed above, provided it did return a valid value.
   // TODO(jmp): May need to change this at some point to keep one thread
   //            available for the OS if the hardware concurrency level is high.
-  const unsigned int real_num_workers = quickstep::FLAGS_num_workers != 0
-                                      ? quickstep::FLAGS_num_workers
-                                      : (num_hw_threads != 0 ?
-                                         num_hw_threads
-                                         : 1);
-
-  if (real_num_workers > 0) {
-    printf("Starting Quickstep with %d worker thread(s) and a %.2f GB buffer pool\n",
-           real_num_workers,
-           (static_cast<double>(quickstep::FLAGS_buffer_pool_slots) * quickstep::kSlotSizeBytes)/quickstep::kAGigaByte);
-  } else {
-    LOG(FATAL) << "Quickstep needs at least one worker thread to run";
+  if (quickstep::FLAGS_num_workers <= 0) {
+    LOG(INFO) << "Quickstep expects at least one worker thread, switching to "
+                 "the default number of worker threads";
   }
+  const int real_num_workers = quickstep::FLAGS_num_workers > 0
+                                   ? quickstep::FLAGS_num_workers
+                                   : (num_hw_threads != 0 ? num_hw_threads : 1);
+
+  DCHECK_GT(real_num_workers, 0);
+  printf("Starting Quickstep with %d worker thread(s) and a %.2f GB buffer pool\n",
+         real_num_workers,
+         (static_cast<double>(quickstep::FLAGS_buffer_pool_slots) * quickstep::kSlotSizeBytes)/quickstep::kAGigaByte);
 
 #ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
   if (quickstep::FLAGS_use_hdfs) {


[3/6] incubator-quickstep git commit: Fixed the time measurement from milli to microseconds.

Posted by zu...@apache.org.
Fixed the time measurement from milli to microseconds.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/040a511a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/040a511a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/040a511a

Branch: refs/heads/SQL-window-aggregation
Commit: 040a511aad35cb958d9d532fabc002313952cb11
Parents: b258821
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sat Jul 2 15:58:52 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jul 2 15:58:52 2016 -0500

----------------------------------------------------------------------
 query_execution/Worker.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/040a511a/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index ae889c7..6ba27f1 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -121,7 +121,7 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
   end = std::chrono::steady_clock::now();
   delete worker_message.getWorkOrder();
   const uint64_t execution_time_microseconds =
-      std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
+      std::chrono::duration_cast<std::chrono::microseconds>(end - start)
           .count();
   // Construct the proto message.
   proto->set_operator_index(worker_message.getRelationalOpIndex());


[2/6] incubator-quickstep git commit: QUICKSTEP-33: Fixed the bug in NumericCast.

Posted by zu...@apache.org.
QUICKSTEP-33: Fixed the bug in NumericCast.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b258821e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b258821e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b258821e

Branch: refs/heads/SQL-window-aggregation
Commit: b258821ef6c00df199e52249eb1099a6d885bbb1
Parents: 3347003
Author: Hakan Memisoglu <ha...@gmail.com>
Authored: Wed Jun 29 14:01:13 2016 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Jun 30 13:26:50 2016 -0700

----------------------------------------------------------------------
 types/operations/unary_operations/NumericCastOperation.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b258821e/types/operations/unary_operations/NumericCastOperation.hpp
----------------------------------------------------------------------
diff --git a/types/operations/unary_operations/NumericCastOperation.hpp b/types/operations/unary_operations/NumericCastOperation.hpp
index 250df6d..6662796 100644
--- a/types/operations/unary_operations/NumericCastOperation.hpp
+++ b/types/operations/unary_operations/NumericCastOperation.hpp
@@ -126,7 +126,7 @@ class UncheckedNumericCastOperator : public UncheckedUnaryOperator {
           result->appendNullValue();
         } else {
           *static_cast<typename TargetType::cpptype*>(result->getPtrForDirectWrite())
-              = static_cast<typename SourceType::cpptype>(*scalar_arg);
+              = static_cast<typename TargetType::cpptype>(*scalar_arg);
         }
       }
       return result;