You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/25 20:23:59 UTC

[1/4] incubator-quickstep git commit: Added QueryManagerDistributed. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/SQL-window-aggregation b80f5e9cf -> c0bb4620c (forced update)


Added QueryManagerDistributed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cdecf0b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cdecf0b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cdecf0b5

Branch: refs/heads/SQL-window-aggregation
Commit: cdecf0b561d0cf1c80516d51401bd098a6263659
Parents: db0e7e3
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Jul 12 22:04:54 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Jul 19 11:46:42 2016 -0700

----------------------------------------------------------------------
 CMakeLists.txt                               |  10 ++
 query_execution/CMakeLists.txt               |  34 +++-
 query_execution/QueryExecutionMessages.proto |  34 +++-
 query_execution/QueryExecutionTypedefs.hpp   |   3 +
 query_execution/QueryManagerDistributed.cpp  | 159 +++++++++++++++++++
 query_execution/QueryManagerDistributed.hpp  | 105 +++++++++++++
 query_execution/ShiftbossDirectory.hpp       | 181 ++++++++++++++++++++++
 7 files changed, 518 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0e5d3de..0bbde61 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -281,6 +281,16 @@ else()
     set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror")
   endif()
 
+  if (ENABLE_DISTRIBUTED)
+    # Clang reports such warning when using Protoc 3.0 beta.
+    if(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
+      CHECK_CXX_COMPILER_FLAG("-Wno-extended-offsetof" COMPILER_HAS_WNO_EXTENDED_OFFSETOF)
+      if (COMPILER_HAS_WNO_EXTENDED_OFFSETOF)
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-extended-offsetof")
+      endif()
+    endif()
+  endif()
+
   # One of the protobuf headers includes a nested anonymous union within
   # another anonymous type. Ordinarily we work around this by compiling the
   # protobuf libraries themselves with "-Wno-nested-anon-types" and including

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 028531d..8c12a5d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -28,10 +28,10 @@ else()
 endif()
 
 # Declare micro-libs:
+add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
 endif()
-add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
@@ -46,7 +46,13 @@ add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryE
 add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp)
 add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp)
 add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
+endif()
 add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
+endif()
 add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
 add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
 add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -143,6 +149,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto
                       quickstep_utility_SortConfiguration_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_relationaloperators_WorkOrder_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_queryexecution_QueryExecutionState
                       glog
@@ -167,6 +174,22 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_DAG
                       quickstep_utility_Macros)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
+                        quickstep_queryexecution_QueryContext
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionState
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_QueryManagerBase
+                        quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_queryexecution_WorkOrderProtosContainer
+                        quickstep_relationaloperators_RelationalOperator
+                        quickstep_relationaloperators_WorkOrder_proto
+                        quickstep_utility_DAG
+                        quickstep_utility_Macros
+                        tmb)
+endif()
 target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryContext
@@ -182,6 +205,11 @@ target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_utility_DAG
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_utility_Macros
+                        tmb)
+endif()
 target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
                       glog
                       quickstep_relationaloperators_WorkOrder_proto
@@ -233,7 +261,9 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_WorkerSelectionPolicy)
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
-                        quickstep_queryexecution_BlockLocator)
+                        quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_QueryManagerDistributed
+                        quickstep_queryexecution_ShiftbossDirectory)
 endif()
 
 # Tests:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 65a8946..fa20993 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,22 +16,24 @@ syntax = "proto2";
 
 package quickstep.serialization;
 
+import "relational_operators/WorkOrder.proto";
+
 // Used for any messages that do not carry payloads.
 message EmptyMessage {
 }
 
 // Note: There are different types of completion messages for normal work orders
 // rebuild work orders. This can be potentially helpful when we want to collect
-// different statistics for executing different types of work orders. 
-// e.g. In select normal work order completion message, we could be interested 
-// in the selectivity of the block whose work order got execute. In rebuild work 
-// order completion message, we may be interested in adding the compression 
+// different statistics for executing different types of work orders.
+// e.g. In select normal work order completion message, we could be interested
+// in the selectivity of the block whose work order got execute. In rebuild work
+// order completion message, we may be interested in adding the compression
 // ratio or dictionary size of the rebuilt block.
 
-// TODO(harshad) : If there are different fields in the two message types below, 
+// TODO(harshad) : If there are different fields in the two message types below,
 // create a base message class called WorkOrderCompletionMessage and make the
 // two classes below extend the base class. All the common fields in both the
-// classes can be moved to the base class. 
+// classes can be moved to the base class.
 
 // A message sent upon completion of a normal (not rebuild) WorkOrder execution.
 message NormalWorkOrderCompletionMessage {
@@ -70,6 +72,26 @@ message WorkOrdersAvailableMessage {
   required uint64 query_id = 2;
 }
 
+// Distributed version related messages.
+message WorkOrderMessage {
+  required uint64 query_id = 1;
+  required uint64 operator_index = 2;
+  required WorkOrder work_order = 3;
+}
+
+message InitiateRebuildMessage {
+  required uint64 query_id = 1;
+  required uint64 operator_index = 2;
+  required uint64 insert_destination_index = 3;
+  required  int32 relation_id = 4;
+}
+
+message InitiateRebuildResponseMessage {
+  required uint64 query_id = 1;
+  required uint64 operator_index = 2;
+  required uint64 num_rebuild_work_orders = 3;
+}
+
 // BlockLocator related messages.
 message BlockDomainRegistrationMessage {
   // Format IP:Port, i.e., "0.0.0.0:0".

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..61e76d7 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -73,6 +73,9 @@ enum QueryExecutionMessageType : message_type_id {
   kPoisonMessage,  // From the main thread to Foreman and Workers.
 
 #ifdef QUICKSTEP_DISTRIBUTED
+  kInitiateRebuildMessage,  // From Foreman to Shiftboss.
+  kInitiateRebuildResponseMessage,  // From Shiftboss to Foreman.
+
   // BlockLocator related messages, sorted in a life cycle of StorageManager
   // with a unique block domain.
   kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
new file mode 100644
index 0000000..e906fa5
--- /dev/null
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -0,0 +1,159 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/QueryManagerDistributed.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "utility/DAG.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+
+namespace quickstep {
+
+QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
+                                                 ShiftbossDirectory *shiftbosses,
+                                                 const tmb::client_id foreman_client_id,
+                                                 tmb::MessageBus *bus)
+    : QueryManagerBase(query_handle),
+      shiftbosses_(shiftbosses),
+      foreman_client_id_(foreman_client_id),
+      bus_(bus),
+      normal_workorder_protos_container_(
+          new WorkOrderProtosContainer(num_operators_in_dag_)) {
+  // Collect all the workorders from all the relational operators in the DAG.
+  for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
+    if (checkAllBlockingDependenciesMet(index)) {
+      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
+      processOperator(index, false);
+    }
+  }
+}
+
+serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage(
+    const dag_node_index start_operator_index) {
+  // Default policy: Operator with lowest index first.
+  size_t num_operators_checked = 0;
+  for (dag_node_index index = start_operator_index;
+       num_operators_checked < num_operators_in_dag_;
+       index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
+    if (query_exec_state_->hasExecutionFinished(index)) {
+      continue;
+    }
+    unique_ptr<serialization::WorkOrder> work_order_proto(
+        normal_workorder_protos_container_->getWorkOrderProto(index));
+    if (work_order_proto != nullptr) {
+      query_exec_state_->incrementNumQueuedWorkOrders(index);
+
+      unique_ptr<serialization::WorkOrderMessage> message_proto(new serialization::WorkOrderMessage);
+      message_proto->set_query_id(query_id_);
+      message_proto->set_operator_index(index);
+      message_proto->mutable_work_order()->MergeFrom(*work_order_proto);
+
+      return message_proto.release();
+    }
+  }
+  // No normal WorkOrder protos available right now.
+  return nullptr;
+}
+
+bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) {
+  bool generated_new_workorder_protos = false;
+  if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
+    // Do not fetch any work units until all blocking dependencies are met.
+    // The releational operator is not aware of blocking dependencies for
+    // uncorrelated scalar queries.
+    if (!checkAllBlockingDependenciesMet(index)) {
+      return false;
+    }
+    const size_t num_pending_workorder_protos_before =
+        normal_workorder_protos_container_->getNumWorkOrderProtos(index);
+    const bool done_generation =
+        query_dag_->getNodePayloadMutable(index)
+            ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
+    if (done_generation) {
+      query_exec_state_->setDoneGenerationWorkOrders(index);
+    }
+
+    // TODO(shoban): It would be a good check to see if operator is making
+    // useful progress, i.e., the operator either generates work orders to
+    // execute or still has pending work orders executing. However, this will not
+    // work if Foreman polls operators without feeding data. This check can be
+    // enabled, if Foreman is refactored to call getAllWorkOrders() only when
+    // pending work orders are completed or new input blocks feed.
+
+    generated_new_workorder_protos =
+        (num_pending_workorder_protos_before <
+         normal_workorder_protos_container_->getNumWorkOrderProtos(index));
+  }
+  return generated_new_workorder_protos;
+}
+
+bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
+  DCHECK(checkRebuildRequired(index));
+  DCHECK(!checkRebuildInitiated(index));
+
+  const RelationalOperator &op = query_dag_->getNodePayload(index);
+  DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
+
+  serialization::InitiateRebuildMessage proto;
+  proto.set_operator_index(index);
+  proto.set_insert_destination_index(op.getInsertDestinationID());
+  proto.set_relation_id(op.getOutputRelationID());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage tagged_msg(static_cast<const void *>(proto_bytes),
+                           proto_length,
+                           kInitiateRebuildMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+            << "') to Shiftboss";
+  // TODO(zuyu): Multiple workers support.
+  QueryExecutionUtil::SendTMBMessage(bus_,
+                                     foreman_client_id_,
+                                     shiftbosses_->getClientId(0),
+                                     move(tagged_msg));
+
+  // The negative value indicates that the number of rebuild work orders is to be
+  // determined.
+  query_exec_state_->setRebuildStatus(index, -1, true);
+
+  // Wait for Shiftbosses to report the number of rebuild work orders.
+  return false;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
new file mode 100644
index 0000000..8641c22
--- /dev/null
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -0,0 +1,105 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+
+#include <memory>
+
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class QueryHandle;
+class ShiftbossDirectory;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class that manages the execution of a query including generation
+ *        of new work orders, keeping track of the query exection state.
+ **/
+class QueryManagerDistributed final : public QueryManagerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_handle The QueryHandle object for this query.
+   * @param shiftbosses The ShiftbossDirectory to use.
+   * @param foreman_client_id The TMB client ID of the foreman thread.
+   * @param bus The TMB used for communication.
+   **/
+  QueryManagerDistributed(QueryHandle *query_handle,
+                          ShiftbossDirectory *shiftbosses,
+                          const tmb::client_id foreman_client_id,
+                          tmb::MessageBus *bus);
+
+  ~QueryManagerDistributed() override {}
+
+  bool fetchNormalWorkOrders(const dag_node_index index) override;
+
+ /**
+   * @brief Get the next normal workorder to be excuted, wrapped in a
+   *        WorkOrderMessage proto.
+   *
+   * @param start_operator_index Begin the search for the schedulable WorkOrder
+   *        with the operator at this index.
+   *
+   * @return A pointer to the WorkOrderMessage proto. If there is no WorkOrder
+   *         to be executed, return NULL.
+   **/
+  serialization::WorkOrderMessage* getNextWorkOrderMessage(
+      const dag_node_index start_operator_index);
+
+ private:
+  bool checkNormalExecutionOver(const dag_node_index index) const override {
+    return (checkAllDependenciesMet(index) &&
+            !normal_workorder_protos_container_->hasWorkOrderProto(index) &&
+            query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
+            query_exec_state_->hasDoneGenerationWorkOrders(index));
+  }
+
+  bool initiateRebuild(const dag_node_index index) override;
+
+  bool checkRebuildOver(const dag_node_index index) const override {
+    return query_exec_state_->hasRebuildInitiated(index) &&
+           (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+  }
+
+  ShiftbossDirectory *shiftbosses_;
+
+  const tmb::client_id foreman_client_id_;
+  tmb::MessageBus *bus_;
+
+  std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
+
+  DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/ShiftbossDirectory.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ShiftbossDirectory.hpp b/query_execution/ShiftbossDirectory.hpp
new file mode 100644
index 0000000..abb5f32
--- /dev/null
+++ b/query_execution/ShiftbossDirectory.hpp
@@ -0,0 +1,181 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
+
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class which keeps the metadata about the shiftbosses.
+ *
+ * @note This class is intended to be used only by ForemanDistributed thread.
+ *       Therefore, none of the methods in this class are thread-safe.
+ **/
+class ShiftbossDirectory {
+ public:
+  /**
+   * @brief Constructor.
+  **/
+  ShiftbossDirectory() = default;
+
+  /**
+   * @brief Add the Shiftboss.
+   *
+   * @param shiftboss_id the TMB client ID of Shiftboss thread.
+   * @param capacity The Work Order processing capacity of Shiftboss.
+   **/
+  void addShiftboss(const tmb::client_id shiftboss_id,
+                    const std::size_t capacity) {
+    client_id_indices_.emplace(shiftboss_id, client_ids_.size());
+    client_ids_.push_back(shiftboss_id);
+    work_order_capacities_.push_back(capacity);
+    num_queued_work_orders_.push_back(0u);
+  }
+
+  /**
+   * @brief Whether the ShiftbossDirectory has any Shiftboss.
+   *
+   * @return True if no Shiftboss in ShiftbossDirectory. Otherwise false.
+   **/
+  bool empty() const {
+    DCHECK_EQ(client_ids_.empty(), client_id_indices_.empty());
+    DCHECK_EQ(client_ids_.empty(), work_order_capacities_.empty());
+    DCHECK_EQ(client_ids_.empty(), num_queued_work_orders_.empty());
+
+    return client_ids_.empty();
+  }
+
+  /**
+   * @brief Get the number of Shiftboss in ShiftbossDirectory.
+   *
+   * @return The number of Shiftboss in ShiftbossDirectory.
+   **/
+  std::size_t size() const {
+    DCHECK_EQ(client_ids_.size(), client_id_indices_.size());
+    DCHECK_EQ(client_ids_.size(), work_order_capacities_.size());
+    DCHECK_EQ(client_ids_.size(), num_queued_work_orders_.size());
+
+    return client_ids_.size();
+  }
+
+  /**
+   * @brief Get the TMB client ID of the specified Shiftboss.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   *
+   * @return The TMB client ID of the given Shiftboss.
+   **/
+  tmb::client_id getClientId(const std::size_t shiftboss_index) const {
+    DCHECK_LT(shiftboss_index, size());
+    return client_ids_[shiftboss_index];
+  }
+
+  /**
+   * @brief Get the Shiftboss index from the specified client id.
+   *
+   * @param shiftboss_id The TMB client ID of Shiftboss.
+   *
+   * @return The index of the given Shiftboss.
+   **/
+  std::size_t getShiftbossIndex(const tmb::client_id shiftboss_id) const {
+    const auto it = client_id_indices_.find(shiftboss_id);
+    DCHECK(it != client_id_indices_.end());
+
+    return it->second;
+  }
+
+  /**
+   * @brief Whether the given Shiftboss has reached its capacity.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   *
+   * @return True if reached the capacity. Otherwise false.
+   **/
+  bool hasReachedCapacity(const std::size_t shiftboss_index) const {
+    DCHECK_LT(shiftboss_index, size());
+    return num_queued_work_orders_[shiftboss_index] >= work_order_capacities_[shiftboss_index];
+  }
+
+  /**
+   * @brief Add the number of new work orders for the given Shiftboss.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   * @param num_new_work_orders The number of the new work orders will be
+   *        executed on Shiftboss indexed by 'shiftboss_index'.
+   **/
+  void addNumQueuedWorkOrders(const std::size_t shiftboss_index,
+                              const std::size_t num_new_work_orders) {
+    num_queued_work_orders_[shiftboss_index] += num_new_work_orders;
+  }
+
+  /**
+   * @brief Increase the number of queued workorders for the given Shiftboss by 1.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   **/
+  void incrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
+    DCHECK_LT(shiftboss_index, size());
+    ++num_queued_work_orders_[shiftboss_index];
+  }
+
+  /**
+   * @brief Decrease the number of queued workorders for the given Shiftboss by 1.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   **/
+  void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
+    DCHECK_LT(shiftboss_index, size());
+    DCHECK_GE(num_queued_work_orders_[shiftboss_index], 1u);
+    --num_queued_work_orders_[shiftboss_index];
+  }
+
+ private:
+  // The TMB client IDs of Shiftbosses.
+  // TODO(zuyu): Support deletions, as Shiftbosses go down.
+  std::vector<tmb::client_id> client_ids_;
+
+  // The map from the TMB client ID of Shiftboss to its index in 'client_ids_'.
+  std::unordered_map<tmb::client_id, std::size_t> client_id_indices_;
+
+  // The max number of WorkOrders per Shiftboss.
+  std::vector<std::size_t> work_order_capacities_;
+
+  // The number of WorkOrders queued for execution per Shiftboss, and the value
+  // should be not greater than that of work_order_capacities_.
+  std::vector<std::size_t> num_queued_work_orders_;
+
+  DISALLOW_COPY_AND_ASSIGN(ShiftbossDirectory);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_


[3/4] incubator-quickstep git commit: - Supported ROWS mode for AVG window aggregation. - Created WindowAggregateFunctions in expressions/window_aggregation. - Created WindowAggregationHandle for AVG to actually do the calculation. - Other functions will

Posted by zu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
new file mode 100644
index 0000000..c044a98
--- /dev/null
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -0,0 +1,387 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *   limitations under the License.
+ **/
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "types/CharType.hpp"
+#include "types/DateOperatorOverloads.hpp"
+#include "types/DatetimeIntervalType.hpp"
+#include "types/DoubleType.hpp"
+#include "types/FloatType.hpp"
+#include "types/IntType.hpp"
+#include "types/IntervalLit.hpp"
+#include "types/LongType.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/VarCharType.hpp"
+#include "types/YearMonthIntervalType.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+#include "gtest/gtest.h"
+
+namespace quickstep {
+
+namespace {
+
+  constexpr int kNumTuples = 100;
+  constexpr int kNumTuplesPerPartition = 8;
+  constexpr int kNullInterval = 25;
+  constexpr int kNumPreceding = 2;
+  constexpr int kNumFollowing = 2;
+
+}  // namespace
+
+// Attribute value could be null if set true.
+class WindowAggregationHandleAvgTest : public::testing::Test {
+ protected:
+  // Handle initialization.
+  void initializeHandle(const Type &argument_type) {
+    const WindowAggregateFunction &function =
+        WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg);
+    std::vector<const Type*> partition_key_types(1, &TypeFactory::GetType(kInt, false));
+    handle_avg_.reset(function.createHandle(std::vector<const Type*>(1, &argument_type),
+                                            std::move(partition_key_types)));
+  }
+
+  // Test canApplyToTypes().
+  static bool CanApplyToTypesTest(TypeID typeID) {
+    const Type &type = (typeID == kChar || typeID == kVarChar) ?
+        TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
+        TypeFactory::GetType(typeID);
+
+    return WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).canApplyToTypes(
+        std::vector<const Type*>(1, &type));
+  }
+
+  // Test resultTypeForArgumentTypes().
+  static bool ResultTypeForArgumentTypesTest(TypeID input_type_id,
+                                             TypeID output_type_id) {
+    const Type *result_type
+        = WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).resultTypeForArgumentTypes(
+            std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+    return (result_type->getTypeID() == output_type_id);
+  }
+
+  template <typename CppType>
+  static void CheckAvgValues(
+      const std::vector<CppType*> &expected,
+      const ColumnVector *actual) {
+    EXPECT_TRUE(actual->isNative());
+    const NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
+
+    EXPECT_EQ(expected.size(), native->size());
+    for (std::size_t i = 0; i < expected.size(); ++i) {
+      if (expected[i] == nullptr) {
+        EXPECT_TRUE(native->getTypedValue(i).isNull());
+      } else {
+        EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral<CppType>());
+      }
+    }
+  }
+
+  // Static templated methods for set a meaningful value to data types.
+  template <typename CppType>
+  static void SetDataType(int value, CppType *data) {
+    *data = value;
+  }
+
+  template <typename GenericType, typename OutputType = DoubleType>
+  void checkWindowAggregationAvgGeneric() {
+    const GenericType &type = GenericType::Instance(true);
+    initializeHandle(type);
+
+    // Create argument, partition key and cpptype vectors.
+    std::vector<typename GenericType::cpptype*> argument_cpp_vector;
+    argument_cpp_vector.reserve(kNumTuples);
+    ColumnVector *argument_type_vector =
+        createArgumentGeneric<GenericType>(&argument_cpp_vector);
+    NativeColumnVector *partition_key_vector =
+        new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+
+    for (int i = 0; i < kNumTuples; ++i) {
+      partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+    }
+
+    // Create tuple ValueAccessor.
+    ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
+    tuple_accessor->addColumn(partition_key_vector);
+    tuple_accessor->addColumn(argument_type_vector);
+
+    // Test UNBOUNDED PRECEDING AND CURRENT ROW.
+    checkAccumulate<GenericType, OutputType>(tuple_accessor,
+                                             argument_type_vector,
+                                             argument_cpp_vector);
+    // Test kNumPreceding PRECEDING AND kNumFollowing FOLLOWING.
+    checkSlidingWindow<GenericType, OutputType>(tuple_accessor,
+                                                argument_type_vector,
+                                                argument_cpp_vector);
+  }
+
+  template <typename GenericType>
+  ColumnVector *createArgumentGeneric(
+      std::vector<typename GenericType::cpptype*> *argument_cpp_vector) {
+    const GenericType &type = GenericType::Instance(true);
+    NativeColumnVector *column = new NativeColumnVector(type, kNumTuples);
+
+    for (int i = 0; i < kNumTuples; ++i) {
+      // Insert a NULL every kNullInterval tuples.
+      if (i % kNullInterval == 0) {
+        argument_cpp_vector->push_back(nullptr);
+        column->appendTypedValue(type.makeNullValue());
+        continue;
+      }
+
+      typename GenericType::cpptype *val = new typename GenericType::cpptype;
+
+      if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+        SetDataType(i - 10, val);
+      } else {
+        SetDataType(static_cast<float>(i - 10) / 10, val);
+      }
+
+      column->appendTypedValue(type.makeValue(val));
+      argument_cpp_vector->push_back(val);
+    }
+
+    return column;
+  }
+
+  template <typename GenericType, typename OutputType>
+  void checkAccumulate(ColumnVectorsValueAccessor *tuple_accessor,
+                       ColumnVector *argument_type_vector,
+                       const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
+    std::vector<ColumnVector*> arguments;
+    arguments.push_back(argument_type_vector);
+    // The partition key index is 0.
+    std::vector<attribute_id> partition_key(1, 0);
+
+    ColumnVector *result =
+        handle_avg_->calculate(tuple_accessor,
+                               std::move(arguments),
+                               partition_key,
+                               true  /* is_row */,
+                               -1  /* num_preceding: UNBOUNDED PRECEDING */,
+                               0  /* num_following: CURRENT ROW */);
+
+    // Get the cpptype result.
+    std::vector<typename OutputType::cpptype*> result_cpp_vector;
+    typename GenericType::cpptype sum;
+    int count;
+    for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+      // Start of new partition
+      if (i % kNumTuplesPerPartition == 0) {
+        SetDataType(0, &sum);
+        count = 0;
+      }
+
+      typename GenericType::cpptype *value = argument_cpp_vector[i];
+      if (value != nullptr) {
+        sum += *value;
+        count++;
+      }
+
+      if (count == 0) {
+        result_cpp_vector.push_back(nullptr);
+      } else {
+        typename OutputType::cpptype *result_cpp_value =
+            new typename OutputType::cpptype;
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+        result_cpp_vector.push_back(result_cpp_value);
+      }
+    }
+
+    CheckAvgValues(result_cpp_vector, result);
+  }
+
+  template <typename GenericType, typename OutputType>
+  void checkSlidingWindow(ColumnVectorsValueAccessor *tuple_accessor,
+                          ColumnVector *argument_type_vector,
+                          const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
+    std::vector<ColumnVector*> arguments;
+    arguments.push_back(argument_type_vector);
+    // The partition key index is 0.
+    std::vector<attribute_id> partition_key(1, 0);
+
+    ColumnVector *result =
+        handle_avg_->calculate(tuple_accessor,
+                               std::move(arguments),
+                               partition_key,
+                               true  /* is_row */,
+                               kNumPreceding,
+                               kNumFollowing);
+
+    // Get the cpptype result.
+    // For each value, calculate all surrounding values in the window.
+    std::vector<typename OutputType::cpptype*> result_cpp_vector;
+
+    for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+      typename GenericType::cpptype sum;
+      SetDataType(0, &sum);
+      int count = 0;
+
+      if (argument_cpp_vector[i] != nullptr) {
+        sum += *argument_cpp_vector[i];
+        count++;
+      }
+
+      for (std::size_t precede = 1; precede <= kNumPreceding; ++precede) {
+        // Not the same partition.
+        if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition ||
+            i < precede) {
+          break;
+        }
+
+        if (argument_cpp_vector[i - precede] != nullptr) {
+          sum += *argument_cpp_vector[i - precede];
+          count++;
+        }
+      }
+
+      for (int follow = 1; follow <= kNumPreceding; ++follow) {
+        // Not the same partition.
+        if (i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition ||
+            i + follow >= kNumTuples) {
+          break;
+        }
+
+        if (argument_cpp_vector[i + follow] != nullptr) {
+          sum += *argument_cpp_vector[i + follow];
+          count++;
+        }
+      }
+
+      if (count == 0) {
+        result_cpp_vector.push_back(nullptr);
+      } else {
+        typename OutputType::cpptype *result_cpp_value =
+            new typename OutputType::cpptype;
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+        result_cpp_vector.push_back(result_cpp_value);
+      }
+    }
+
+    CheckAvgValues(result_cpp_vector, result);
+  }
+
+  std::unique_ptr<WindowAggregationHandle> handle_avg_;
+};
+
+template <>
+void WindowAggregationHandleAvgTest::CheckAvgValues<double>(
+    const std::vector<double*> &expected,
+    const ColumnVector *actual) {
+  EXPECT_TRUE(actual->isNative());
+  const NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
+
+  EXPECT_EQ(expected.size(), native->size());
+  for (std::size_t i = 0; i < expected.size(); ++i) {
+    if (expected[i] == nullptr) {
+      EXPECT_TRUE(native->getTypedValue(i).isNull());
+    } else {
+      EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral<double>());
+    }
+  }
+}
+
+template <>
+void WindowAggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(
+    int value, DatetimeIntervalLit *data) {
+  data->interval_ticks = value;
+}
+
+template <>
+void WindowAggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(
+    int value, YearMonthIntervalLit *data) {
+  data->months = value;
+}
+
+typedef WindowAggregationHandleAvgTest WindowAggregationHandleAvgDeathTest;
+
+TEST_F(WindowAggregationHandleAvgTest, IntTypeTest) {
+  checkWindowAggregationAvgGeneric<IntType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, LongTypeTest) {
+  checkWindowAggregationAvgGeneric<LongType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, FloatTypeTest) {
+  checkWindowAggregationAvgGeneric<FloatType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, DoubleTypeTest) {
+  checkWindowAggregationAvgGeneric<DoubleType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, DatetimeIntervalTypeTest) {
+  checkWindowAggregationAvgGeneric<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, YearMonthIntervalTypeTest) {
+  checkWindowAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+#ifdef QUICKSTEP_DEBUG
+TEST_F(WindowAggregationHandleAvgDeathTest, CharTypeTest) {
+  const Type &type = CharType::Instance(true, 10);
+  EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(WindowAggregationHandleAvgDeathTest, VarTypeTest) {
+  const Type &type = VarCharType::Instance(true, 10);
+  EXPECT_DEATH(initializeHandle(type), "");
+}
+#endif
+
+TEST_F(WindowAggregationHandleAvgDeathTest, canApplyToTypeTest) {
+  EXPECT_TRUE(CanApplyToTypesTest(kInt));
+  EXPECT_TRUE(CanApplyToTypesTest(kLong));
+  EXPECT_TRUE(CanApplyToTypesTest(kFloat));
+  EXPECT_TRUE(CanApplyToTypesTest(kDouble));
+  EXPECT_FALSE(CanApplyToTypesTest(kChar));
+  EXPECT_FALSE(CanApplyToTypesTest(kVarChar));
+  EXPECT_FALSE(CanApplyToTypesTest(kDatetime));
+  EXPECT_TRUE(CanApplyToTypesTest(kDatetimeInterval));
+  EXPECT_TRUE(CanApplyToTypesTest(kYearMonthInterval));
+}
+
+TEST_F(WindowAggregationHandleAvgDeathTest, ResultTypeForArgumentTypeTest) {
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kInt, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kLong, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kFloat, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDouble, kDouble));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDatetimeInterval, kDatetimeInterval));
+  EXPECT_TRUE(ResultTypeForArgumentTypesTest(kYearMonthInterval, kYearMonthInterval));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7e53b9d..a56b714 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -69,6 +69,8 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryoptimizer_ExecutionHeuristics

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 43d63f9..ce21ade 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -48,6 +48,8 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
 #include "query_optimizer/ExecutionHeuristics.hpp"
@@ -1652,7 +1654,7 @@ void ExecutionGenerator::convertWindowAggregate(
   // Get input.
   const CatalogRelationInfo *input_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->input());
-  window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+  window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
 
   // Get window aggregate function expression.
   const E::AliasPtr &named_window_aggregate_expression =
@@ -1713,6 +1715,7 @@ void ExecutionGenerator::convertWindowAggregate(
   const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
       execution_plan_->addRelationalOperator(
           new WindowAggregationOperator(query_handle_->query_id(),
+                                        *input_relation_info->relation,
                                         *output_relation,
                                         window_aggr_state_index,
                                         insert_destination_index));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 08d7df5..d12644a 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -304,7 +304,7 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       glog
-                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_Expression

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp
index 7b1f304..be5db59 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.cpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.cpp
@@ -22,7 +22,7 @@
 #include <utility>
 #include <vector>
 
-#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/expressions/PatternMatcher.hpp"
@@ -59,7 +59,7 @@ const Type& WindowAggregateFunction::getValueType() const {
 }
 
 WindowAggregateFunctionPtr WindowAggregateFunction::Create(
-    const ::quickstep::AggregateFunction &window_aggregate,
+    const ::quickstep::WindowAggregateFunction &window_aggregate,
     const std::vector<ScalarPtr> &arguments,
     const WindowInfo &window_info,
     const std::string &window_name,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp
index 0bee28f..0cc80df 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.hpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.hpp
@@ -33,8 +33,8 @@
 
 namespace quickstep {
 
-class AggregateFunction;
 class Type;
+class WindowAggregateFunction;
 
 namespace optimizer {
 namespace expressions {
@@ -140,7 +140,7 @@ class WindowAggregateFunction : public Expression {
    * @return The WindowAggregateFunction singleton (from the expression system)
    *         for this node.
    **/
-  inline const ::quickstep::AggregateFunction& window_aggregate() const {
+  inline const ::quickstep::WindowAggregateFunction& window_aggregate() const {
     return window_aggregate_;
   }
 
@@ -185,7 +185,7 @@ class WindowAggregateFunction : public Expression {
    * @param is_distinct Whether this is a DISTINCT aggregation.
    * @return A new AggregateFunctionPtr.
    **/
-  static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate,
+  static WindowAggregateFunctionPtr Create(const ::quickstep::WindowAggregateFunction &window_aggregate,
                                            const std::vector<ScalarPtr> &arguments,
                                            const WindowInfo &window_info,
                                            const std::string &window_name,
@@ -209,7 +209,7 @@ class WindowAggregateFunction : public Expression {
    * @param window_info The window info of the window aggregate function.
    * @param is_distinct Indicates whether this is a DISTINCT aggregation.
    */
-  WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate,
+  WindowAggregateFunction(const ::quickstep::WindowAggregateFunction &window_aggregate,
                           const std::vector<ScalarPtr> &arguments,
                           const WindowInfo &window_info,
                           const std::string &window_name,
@@ -228,7 +228,7 @@ class WindowAggregateFunction : public Expression {
   // window_aggregate_. If it really needs to be seperated from the
   // AggregationFunction, a new class for WindowAggregationFunction should be
   // created as quickstep::WindowAggregateFunction.
-  const ::quickstep::AggregateFunction &window_aggregate_;
+  const ::quickstep::WindowAggregateFunction &window_aggregate_;
   std::vector<ScalarPtr> arguments_;
   const WindowInfo window_info_;
   const std::string window_name_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt
index fb75767..9313e51 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -39,6 +39,8 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
                       quickstep_expressions_tablegenerator_GeneratorFunction
                       quickstep_expressions_tablegenerator_GeneratorFunctionFactory
                       quickstep_expressions_tablegenerator_GeneratorFunctionHandle
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
                       quickstep_parser_ParseAssignment
                       quickstep_parser_ParseBasicExpressions
                       quickstep_parser_ParseBlockProperties

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index f10378b..c224388 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -35,6 +35,8 @@
 #include "expressions/table_generator/GeneratorFunction.hpp"
 #include "expressions/table_generator/GeneratorFunctionFactory.hpp"
 #include "expressions/table_generator/GeneratorFunctionHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
 #include "parser/ParseAssignment.hpp"
 #include "parser/ParseBasicExpressions.hpp"
 #include "parser/ParseBlockProperties.hpp"
@@ -2624,11 +2626,19 @@ E::ScalarPtr Resolver::resolveFunctionCall(
         << "COUNT aggregate has both star (*) and non-star arguments.";
   }
 
-  // Try to look up the AggregateFunction by name using
-  // AggregateFunctionFactory.
-  const ::quickstep::AggregateFunction *aggregate
-      = AggregateFunctionFactory::GetByName(function_name);
-  if (aggregate == nullptr) {
+  // Try to look up the AggregateFunction/WindowAggregationFunction by name.
+  // TODO(Shixuan): We might want to create a new abstract class Function to
+  // include both AggregateFunction and WindowAggregateFunction, which will make
+  // this part of code cleaner.
+  const ::quickstep::AggregateFunction *aggregate = nullptr;
+  const ::quickstep::WindowAggregateFunction *window_aggregate = nullptr;
+  if (parse_function_call.isWindow()) {
+    window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
+  } else {
+    aggregate = AggregateFunctionFactory::GetByName(function_name);
+  }
+
+  if (aggregate == nullptr && window_aggregate == nullptr) {
     THROW_SQL_ERROR_AT(&parse_function_call)
         << "Unrecognized function name \""
         << parse_function_call.name()->value()
@@ -2656,11 +2666,14 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   }
 
   // Make sure a naked COUNT() with no arguments wasn't specified.
-  if ((aggregate->getAggregationID() == AggregationID::kCount)
-      && (resolved_arguments.empty())
-      && (!count_star)) {
-    THROW_SQL_ERROR_AT(&parse_function_call)
-        << "COUNT aggregate requires an argument (either scalar or star (*))";
+  if ((aggregate != nullptr &&
+       aggregate->getAggregationID() == AggregationID::kCount) ||
+      (window_aggregate != nullptr &&
+       window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
+    if ((resolved_arguments.empty()) && !count_star) {
+      THROW_SQL_ERROR_AT(&parse_function_call)
+          << "COUNT aggregate requires an argument (either scalar or star (*))";
+    }
   }
 
   // Resolve each of the Scalar arguments to the aggregate.
@@ -2670,7 +2683,8 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   }
 
   // Make sure that the aggregate can apply to the specified argument(s).
-  if (!aggregate->canApplyToTypes(argument_types)) {
+  if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
+      || (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
     THROW_SQL_ERROR_AT(&parse_function_call)
         << "Aggregate function " << aggregate->getName()
         << " can not apply to the given argument(s).";
@@ -2679,7 +2693,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   if (parse_function_call.isWindow()) {
     return resolveWindowAggregateFunction(parse_function_call,
                                           expression_resolution_info,
-                                          aggregate,
+                                          window_aggregate,
                                           resolved_arguments);
   }
 
@@ -2705,7 +2719,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
 E::ScalarPtr Resolver::resolveWindowAggregateFunction(
     const ParseFunctionCall &parse_function_call,
     ExpressionResolutionInfo *expression_resolution_info,
-    const ::quickstep::AggregateFunction *window_aggregate,
+    const ::quickstep::WindowAggregateFunction *window_aggregate,
     const std::vector<E::ScalarPtr> &resolved_arguments) {
   // A window aggregate function might be defined OVER a window name or a window.
   E::WindowAggregateFunctionPtr window_aggregate_function;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp
index f4024e9..373430c 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -23,7 +23,6 @@
 #include <unordered_set>
 #include <vector>
 
-#include "query_optimizer/expressions/AggregateFunction.hpp"
 #include "query_optimizer/expressions/Alias.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
@@ -460,14 +459,14 @@ class Resolver {
    * @param expression_resolution_info Resolution info that contains the name
    *                                   resolver and info to be updated after
    *                                   resolution.
-   * @param aggregate The aggregate function.
+   * @param aggregate The window aggregate function.
    * @param resolved_arguments The resolved arguments.
    * @return An expression in the query optimizer.
    */
   expressions::ScalarPtr resolveWindowAggregateFunction(
       const ParseFunctionCall &parse_function_call,
       ExpressionResolutionInfo *expression_resolution_info,
-      const ::quickstep::AggregateFunction *aggregate,
+      const ::quickstep::WindowAggregateFunction *aggregate,
       const std::vector<expressions::ScalarPtr> &resolved_arguments);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 16127cc..30a3c39 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -953,19 +953,79 @@ WHERE double_col < 0
 ==
 
 # Window Aggregation Test.
-# Currently this is not supported, an empty table will be returned.
-SELECT avg(int_col) OVER w FROM test
+SELECT char_col, long_col, avg(long_col) OVER w FROM test
 WINDOW w AS
-(PARTITION BY char_col
- ORDER BY long_col DESC NULLS LAST
+(ORDER BY char_col DESC NULLS LAST
  ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
 --
-+------------------------+
-|avg(int_col)            |
-+------------------------+
-+------------------------+
++--------------------+--------------------+------------------------+
+|char_col            |long_col            |avg(long_col)           |
++--------------------+--------------------+------------------------+
+|          8 2.828427|                  64|                      64|
+|          6 2.449490|                  36|                      50|
+|          4 2.000000|                  16|      38.666666666666664|
+|         24 4.898979|                 576|                     173|
+|         22 4.690416|                 484|      235.19999999999999|
+|         20 4.472136|                 400|      262.66666666666669|
+|          2 1.414214|                   4|      225.71428571428572|
+|         18 4.242641|                 324|                     238|
+|         16 4.000000|                 256|                     240|
+|         14 3.741657|                 196|      235.59999999999999|
+|         12 3.464102|                 144|      227.27272727272728|
+|         10 3.162278|                 100|      216.66666666666666|
+|          0 0.000000|                   0|                     200|
+|         -9 3.000000|                  81|                   191.5|
+|         -7 2.645751|                  49|                     182|
+|         -5 2.236068|                  25|                172.1875|
+|         -3 1.732051|                   9|      162.58823529411765|
+|        -23 4.795832|                 529|      182.94444444444446|
+|        -21 4.582576|                 441|      196.52631578947367|
+|        -19 4.358899|                 361|                  204.75|
+|        -17 4.123106|                 289|      208.76190476190476|
+|        -15 3.872983|                 225|                   209.5|
+|        -13 3.605551|                 169|       207.7391304347826|
+|        -11 3.316625|                 121|                 204.125|
+|         -1 1.000000|                   1|                     196|
++--------------------+--------------------+------------------------+
 ==
 
+SELECT long_col, int_col, avg(int_col) OVER w FROM test
+WINDOW w AS
+(ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING);
+--
++--------------------+-----------+------------------------+
+|long_col            |int_col    |avg(int_col)            |
++--------------------+-----------+------------------------+
+|                 576|         24|       7.666666666666667|
+|                 529|        -23|                     0.5|
+|                 484|         22|                     0.5|
+|                 441|        -21|                  -10.25|
+|                 400|       NULL|                       0|
+|                 361|        -19|                   -9.75|
+|                 324|         18|                    -0.5|
+|                 289|        -17|     -3.3999999999999999|
+|                 256|         16|      3.2000000000000002|
+|                 225|        -15|                      -3|
+|                 196|         14|      2.7999999999999998|
+|                 169|        -13|     -2.6000000000000001|
+|                 144|         12|                     0.5|
+|                 121|        -11|                   -5.25|
+|                 100|       NULL|                       0|
+|                  81|         -9|                   -4.75|
+|                  64|          8|                    -0.5|
+|                  49|         -7|     -1.3999999999999999|
+|                  36|          6|                     1.2|
+|                  25|         -5|                      -1|
+|                  16|          4|     0.80000000000000004|
+|                   9|         -3|    -0.59999999999999998|
+|                   4|          2|                     0.5|
+|                   1|         -1|    -0.66666666666666663|
+|                   0|       NULL|                     0.5|
++--------------------+-----------+------------------------+
+==
+
+# Currently this is not supported, an empty table will be returned.
 SELECT int_col, sum(float_col) OVER
 (PARTITION BY char_col, long_col
  ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
@@ -987,5 +1047,5 @@ WINDOW w AS
 +------------------------+
 |sum(avg(int_col))       |
 +------------------------+
-|                    NULL|
+|                     -18|
 +------------------------+

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 249441d..a51370b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -434,6 +434,7 @@ target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_StorageBlockInfo
+                      quickstep_storage_WindowAggregationOperationState
                       quickstep_utility_Macros
                       tmb)                      
 target_link_libraries(quickstep_relationaloperators_WorkOrder

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 93cb9d4..3149864 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -21,11 +21,13 @@
 
 #include <vector>
 
+#include "catalog/CatalogRelation.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.hpp"
 
 #include "tmb/id_typedefs.h"
 
@@ -40,10 +42,14 @@ bool WindowAggregationOperator::getAllWorkOrders(
   DCHECK(query_context != nullptr);
 
   if (blocking_dependencies_met_ && !generated_) {
+    std::vector<block_id> relation_blocks =
+        input_relation_.getBlocksSnapshot();
+
     container->addNormalWorkOrder(
         new WindowAggregationWorkOrder(
             query_id_,
             query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+            std::move(relation_blocks),
             query_context->getInsertDestination(output_destination_index_)),
         op_index_);
     generated_ = true;
@@ -67,6 +73,11 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
   proto->set_query_id(query_id_);
   proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
                       window_aggregation_state_index_);
+
+  const std::vector<block_id> relation_blocks = input_relation_.getBlocksSnapshot();
+  for (const block_id bid : relation_blocks) {
+    proto->AddExtension(serialization::WindowAggregationWorkOrder::block_ids, bid);
+  }
   proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
                       output_destination_index_);
 
@@ -75,8 +86,8 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
 
 
 void WindowAggregationWorkOrder::execute() {
-  std::cout << "Window aggregation is not supported yet.\n"
-      << "An empty table is returned\n";
+  state_->windowAggregateBlocks(output_destination_,
+                                block_ids_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
index f3dfd14..bd83248 100644
--- a/relational_operators/WindowAggregationOperator.hpp
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -58,16 +58,19 @@ class WindowAggregationOperator : public RelationalOperator {
    *
    * @param query_id The ID of this query.
    * @param input_relation The relation to perform aggregation over.
+   * @param output_relation The relation for output.
    * @param window_aggregation_state_index The index of WindowAggregationState
    *                                       in QueryContext.
    * @param output_destination_index The index of InsertDestination in
    *                                 QueryContext for the output.
    **/
   WindowAggregationOperator(const std::size_t query_id,
+                            const CatalogRelation &input_relation,
                             const CatalogRelation &output_relation,
                             const QueryContext::window_aggregation_state_id window_aggregation_state_index,
                             const QueryContext::insert_destination_id output_destination_index)
       : RelationalOperator(query_id),
+        input_relation_(input_relation),
         output_relation_(output_relation),
         window_aggregation_state_index_(window_aggregation_state_index),
         output_destination_index_(output_destination_index),
@@ -99,6 +102,7 @@ class WindowAggregationOperator : public RelationalOperator {
    **/
   serialization::WorkOrder* createWorkOrderProto();
 
+  const CatalogRelation &input_relation_;
   const CatalogRelation &output_relation_;
   const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
   const QueryContext::insert_destination_id output_destination_index_;
@@ -117,43 +121,25 @@ class WindowAggregationWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of this query.
    * @param state The WindowAggregationOperatorState to use.
+   * @param block_ids The blocks' id of the input relation.
    * @param output_destination The InsertDestination for output.
    **/
   WindowAggregationWorkOrder(const std::size_t query_id,
                              WindowAggregationOperationState *state,
+                             std::vector<block_id> &&block_ids,
                              InsertDestination *output_destination)
       : WorkOrder(query_id),
         state_(state),
+        block_ids_(std::move(block_ids)),
         output_destination_(output_destination)  {}
 
   ~WindowAggregationWorkOrder() override {}
 
-  /**
-   * @brief Get the pointer to WindowAggregationOperationState.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return A pointer to the window aggregation operation state.
-   **/
-  WindowAggregationOperationState* state() {
-    return state_;
-  }
-
-  /**
-   * @brief Get the pointer to output destination.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return A pointer to the output destination.
-   **/
-  InsertDestination* output_destination() {
-    return output_destination_;
-  }
-
   void execute() override;
 
  private:
   WindowAggregationOperationState *state_;
+  const std::vector<block_id> block_ids_;
   InsertDestination *output_destination_;
 
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationWorkOrder);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 69dee1b..076735f 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -249,6 +249,7 @@ message WindowAggregationWorkOrder {
   extend WorkOrder {
     // All required
     optional uint32 window_aggr_state_index = 336;
-    optional int32 insert_destination_index = 337;
+    repeated fixed64 block_ids = 337;
+    optional int32 insert_destination_index = 338;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 9df66e1..582effd 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -1053,19 +1053,26 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_ExpressionFactories
                       quickstep_expressions_Expressions_proto
-                      quickstep_expressions_aggregation_AggregateFunction
-                      quickstep_expressions_aggregation_AggregateFunctionFactory
-                      quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_storage_InsertDestination
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
+                      quickstep_storage_SubBlocksReference
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
                       quickstep_storage_WindowAggregationOperationState_proto
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorUtil
+                      quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
-                      quickstep_expressions_aggregation_AggregateFunction_proto
                       quickstep_expressions_Expressions_proto
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
                       ${PROTOBUF_LIBRARY})
 
 # Module all-in-one library:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index a0bcc37..0cdfc1a 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -31,14 +31,21 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/ExpressionFactories.hpp"
 #include "expressions/Expressions.pb.h"
-#include "expressions/aggregation/AggregateFunction.hpp"
-#include "expressions/aggregation/AggregateFunctionFactory.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/InsertDestination.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
 #include "storage/WindowAggregationOperationState.pb.h"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/containers/ColumnVectorUtil.hpp"
 
 #include "glog/logging.h"
 
@@ -46,23 +53,21 @@ namespace quickstep {
 
 WindowAggregationOperationState::WindowAggregationOperationState(
     const CatalogRelationSchema &input_relation,
-    const AggregateFunction *window_aggregate_function,
+    const WindowAggregateFunction *window_aggregate_function,
     std::vector<std::unique_ptr<const Scalar>> &&arguments,
-    std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
     const bool is_row,
     const std::int64_t num_preceding,
     const std::int64_t num_following,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
       arguments_(std::move(arguments)),
-      partition_by_attributes_(std::move(partition_by_attributes)),
       is_row_(is_row),
       num_preceding_(num_preceding),
       num_following_(num_following),
       storage_manager_(storage_manager) {
   // Get the Types of this window aggregate's arguments so that we can create an
   // AggregationHandle.
-  // TODO(Shixuan): Next step: New handles for window aggregation function.
   std::vector<const Type*> argument_types;
   for (const std::unique_ptr<const Scalar> &argument : arguments_) {
     argument_types.emplace_back(&argument->getType());
@@ -71,28 +76,18 @@ WindowAggregationOperationState::WindowAggregationOperationState(
   // Check if window aggregate function could apply to the arguments.
   DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
 
+  // IDs and types of partition keys.
+  std::vector<const Type*> partition_by_types;
+  for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
+    partition_by_ids_.push_back(
+        partition_by_attribute->getAttributeIdForValueAccessor());
+    partition_by_types.push_back(&partition_by_attribute->getType());
+  }
+
   // Create the handle and initial state.
   window_aggregation_handle_.reset(
-      window_aggregate_function->createHandle(argument_types));
-  window_aggregation_state_.reset(
-      window_aggregation_handle_->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  // See if all of this window aggregate's arguments are attributes in the input
-  // relation. If so, remember the attribute IDs so that we can do copy elision
-  // when actually performing the window aggregation.
-  arguments_as_attributes_.reserve(arguments_.size());
-  for (const std::unique_ptr<const Scalar> &argument : arguments_) {
-    const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
-    if (argument_id == -1) {
-      arguments_as_attributes_.clear();
-      break;
-    } else {
-      DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
-      arguments_as_attributes_.push_back(argument_id);
-    }
-  }
-#endif
+      window_aggregate_function->createHandle(std::move(argument_types),
+                                              std::move(partition_by_types)));
 }
 
 WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
@@ -101,10 +96,6 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
     StorageManager *storage_manager) {
   DCHECK(ProtoIsValid(proto, database));
 
-  // Rebuild contructor arguments from their representation in 'proto'.
-  const AggregateFunction *aggregate_function
-      = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
-
   std::vector<std::unique_ptr<const Scalar>> arguments;
   arguments.reserve(proto.arguments_size());
   for (int argument_idx = 0; argument_idx < proto.arguments_size(); ++argument_idx) {
@@ -126,10 +117,10 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
   const std::int64_t num_preceding = proto.num_preceding();
   const std::int64_t num_following = proto.num_following();
 
-  return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
-                                             aggregate_function,
+  return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+                                             &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()),
                                              std::move(arguments),
-                                             std::move(partition_by_attributes),
+                                             partition_by_attributes,
                                              is_row,
                                              num_preceding,
                                              num_following,
@@ -139,11 +130,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
 bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
                                                    const CatalogDatabaseLite &database) {
   if (!proto.IsInitialized() ||
-      !database.hasRelationWithId(proto.relation_id())) {
+      !database.hasRelationWithId(proto.input_relation_id())) {
     return false;
   }
 
-  if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+  if (!WindowAggregateFunctionFactory::ProtoIsValid(proto.function())) {
     return false;
   }
 
@@ -176,4 +167,122 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
   return true;
 }
 
+void WindowAggregationOperationState::windowAggregateBlocks(
+    InsertDestination *output_destination,
+    const std::vector<block_id> &block_ids) {
+  // TODO(Shixuan): This is a quick fix for currently unsupported functions in
+  // order to pass the query_optimizer test.
+  if (window_aggregation_handle_.get() == nullptr) {
+    std::cout << "The function will be supported in the near future :)\n";
+    return;
+  }
+
+  // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar
+  // work. This will need Order Key to be passed so that we know where the
+  // window should start and end.
+  if (!is_row_) {
+    std::cout << "Currently we don't support RANGE frame mode :(\n";
+    return;
+  }
+
+  // Get the total number of tuples.
+  int num_tuples = 0;
+  for (const block_id block_idx : block_ids) {
+    num_tuples +=
+        storage_manager_->getBlock(block_idx, input_relation_)->getNumTuples();
+  }
+
+  // Construct column vectors for attributes.
+  std::vector<ColumnVector*> attribute_vecs;
+  for (std::size_t attr_id = 0; attr_id < input_relation_.size(); ++attr_id) {
+    const CatalogAttribute *attr = input_relation_.getAttributeById(attr_id);
+    const Type &type = attr->getType();
+
+    if (NativeColumnVector::UsableForType(type)) {
+      attribute_vecs.push_back(new NativeColumnVector(type, num_tuples));
+    } else {
+      attribute_vecs.push_back(new IndirectColumnVector(type, num_tuples));
+    }
+  }
+
+  // Construct column vectors for arguments.
+  std::vector<ColumnVector*> argument_vecs;
+  for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+    const Type &type = argument->getType();
+
+    if (NativeColumnVector::UsableForType(type)) {
+      argument_vecs.push_back(new NativeColumnVector(type, num_tuples));
+    } else {
+      argument_vecs.push_back(new IndirectColumnVector(type, num_tuples));
+    }
+  }
+
+  // TODO(Shixuan): Add Support for Vector Copy Elision Selection.
+  // Add tuples and arguments into ColumnVectors.
+  for (const block_id block_idx : block_ids) {
+    BlockReference block = storage_manager_->getBlock(block_idx, input_relation_);
+    const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+    SubBlocksReference sub_block_ref(tuple_block,
+                                     block->getIndices(),
+                                     block->getIndicesConsistent());
+    ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
+    ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
+    for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+      argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
+                                                          &sub_block_ref));
+    }
+
+    InvokeOnAnyValueAccessor(tuple_accessor,
+                             [&] (auto *tuple_accessor) -> void {  // NOLINT(build/c++11)
+      tuple_accessor->beginIteration();
+      argument_accessor->beginIteration();
+
+      while (tuple_accessor->next() && argument_accessor->next()) {
+        for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) {
+          ColumnVector *attr_vec = attribute_vecs[attr_id];
+          if (attr_vec->isNative()) {
+            static_cast<NativeColumnVector*>(attr_vec)->appendTypedValue(
+                tuple_accessor->getTypedValue(attr_id));
+          } else {
+            static_cast<IndirectColumnVector*>(attr_vec)->appendTypedValue(
+                tuple_accessor->getTypedValue(attr_id));
+          }
+        }
+
+        for (std::size_t argument_idx = 0;
+             argument_idx < argument_vecs.size();
+             ++argument_idx) {
+          ColumnVector *argument = argument_vecs[argument_idx];
+          if (argument->isNative()) {
+            static_cast<NativeColumnVector*>(argument)->appendTypedValue(
+                argument_accessor->getTypedValue(argument_idx));
+          } else {
+            static_cast<IndirectColumnVector*>(argument)->appendTypedValue(
+                argument_accessor->getTypedValue(argument_idx));
+          }
+        }
+      }
+    });
+  }
+
+  // Construct the value accessor for tuples in all blocks
+  ColumnVectorsValueAccessor *all_blocks_accessor
+      = new ColumnVectorsValueAccessor();
+  for (ColumnVector *attr_vec : attribute_vecs) {
+    all_blocks_accessor->addColumn(attr_vec);
+  }
+
+  // Do actual calculation in handle.
+  ColumnVector *window_aggregates =
+      window_aggregation_handle_->calculate(all_blocks_accessor,
+                                            std::move(argument_vecs),
+                                            partition_by_ids_,
+                                            is_row_,
+                                            num_preceding_,
+                                            num_following_);
+
+  all_blocks_accessor->addColumn(window_aggregates);
+  output_destination->bulkInsertTuples(all_blocks_accessor);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index d7b3e6a..9792a99 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -25,20 +25,20 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/WindowAggregationOperationState.pb.h"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
-class AggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
 class InsertDestination;
 class StorageManager;
+class WindowAggregateFunction;
 
 /** \addtogroup Storage
  *  @{
@@ -63,13 +63,12 @@ class WindowAggregationOperationState {
    *                      current row. -1 means UNBOUNDED PRECEDING.
    * @param num_following The number of rows/range for the tuples following the
    *                      current row. -1 means UNBOUNDED FOLLOWING.
-   * @param storage_manager The StorageManager to use for allocating hash
-   *        tables.
+   * @param storage_manager The StorageManager to get block references.
    */
   WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
-                                  const AggregateFunction *window_aggregate_function,
+                                  const WindowAggregateFunction *window_aggregate_function,
                                   std::vector<std::unique_ptr<const Scalar>> &&arguments,
-                                  std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+                                  const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
                                   const bool is_row,
                                   const std::int64_t num_preceding,
                                   const std::int64_t num_following,
@@ -107,66 +106,29 @@ class WindowAggregationOperationState {
                            const CatalogDatabaseLite &database);
 
   /**
-   * @brief Get the is_row info.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   * 
-   * @return True if the frame mode is ROW, false if it is RANGE.
-   **/
-  const bool is_row() const { return is_row_; }
-
-  /**
-   * @brief Get the num_preceding info.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return The number of rows/range that precedes the current row.
-   **/
-  const std::int64_t num_preceding() const { return num_preceding_; }
-
-  /**
-   * @brief Get the num_following info.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
+   * @brief Compute window aggregates on the tuples of the given relation.
    *
-   * @return The number of rows/range that follows the current row.
+   * @param output_destination The output destination for the computed window
+   *                           aggregate.
+   * @param block_ids The id of the blocks to be computed.
    **/
-  const std::int64_t num_following() const { return num_following_; }
-
-  /**
-   * @brief Get the pointer to StorageManager.
-   * @note This is a quickfix for "unused variable". After the window aggregate
-   *       functions are built, these methods might be dropped.
-   *
-   * @return A pointer to the storage manager.
-   **/
-  StorageManager *storage_manager() { return storage_manager_; }
+  void windowAggregateBlocks(InsertDestination *output_destination,
+                             const std::vector<block_id> &block_ids);
 
  private:
   const CatalogRelationSchema &input_relation_;
-
-  // TODO(Shixuan): Handle and State for window aggregation will be needed for
-  //                actual calculation.
-  std::unique_ptr<AggregationHandle> window_aggregation_handle_;
-  std::unique_ptr<AggregationState> window_aggregation_state_;
+  const std::vector<block_id> block_ids_;
+  std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
   std::vector<std::unique_ptr<const Scalar>> arguments_;
+  std::vector<attribute_id> partition_by_ids_;
 
-  // We don't add order_by_attributes here since it is not needed after sorting.
-  std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
-
-  // Window framing information.
+  // Frame info.
   const bool is_row_;
   const std::int64_t num_preceding_;
   const std::int64_t num_following_;
 
   StorageManager *storage_manager_;
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  // If all an aggregate's argument expressions are simply attributes in
-  // 'input_relation_', then this caches the attribute IDs of those arguments.
-  std::vector<attribute_id> arguments_as_attributes_;
-#endif
-
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index c7bd0ef..d888461 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -19,12 +19,12 @@ syntax = "proto2";
 
 package quickstep.serialization;
 
-import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/window_aggregation/WindowAggregateFunction.proto";
 import "expressions/Expressions.proto";
 
 message WindowAggregationOperationState {
-  required int32 relation_id = 1;
-  required AggregateFunction function = 2;
+  required int32 input_relation_id = 1;
+  required WindowAggregateFunction function = 2;
   repeated Scalar arguments = 3;
   repeated Scalar partition_by_attributes = 4;
   required bool is_row = 5;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/tests/WindowAggregationOperationState_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/WindowAggregationOperationState_unittest.cpp b/storage/tests/WindowAggregationOperationState_unittest.cpp
index c572034..d58f0f5 100644
--- a/storage/tests/WindowAggregationOperationState_unittest.cpp
+++ b/storage/tests/WindowAggregationOperationState_unittest.cpp
@@ -23,7 +23,7 @@
 #include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "storage/WindowAggregationOperationState.hpp"
 #include "storage/WindowAggregationOperationState.pb.h"
 
@@ -57,8 +57,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, UninitializationTest) {
 
 TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
   serialization::WindowAggregationOperationState proto;
-  proto.set_relation_id(kInvalidTableId);
-  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_input_relation_id(kInvalidTableId);
+  proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
   proto.set_is_row(true);
   proto.set_num_preceding(kValidNum);
   proto.set_num_following(kValidNum);
@@ -67,8 +67,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
 
 TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
   serialization::WindowAggregationOperationState proto;
-  proto.set_relation_id(rel_id_);
-  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_input_relation_id(rel_id_);
+  proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
   proto.set_is_row(true);
   proto.set_num_preceding(kInvalidNum);
   proto.set_num_following(kValidNum);
@@ -81,8 +81,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
 
 TEST_F(WindowAggregationOperationStateProtoTest, ValidTest) {
   serialization::WindowAggregationOperationState proto;
-  proto.set_relation_id(rel_id_);
-  proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+  proto.set_input_relation_id(rel_id_);
+  proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
   proto.set_is_row(true);
   proto.set_num_preceding(kValidNum);
   proto.set_num_following(kValidNum);


[4/4] incubator-quickstep git commit: - Supported ROWS mode for AVG window aggregation. - Created WindowAggregateFunctions in expressions/window_aggregation. - Created WindowAggregationHandle for AVG to actually do the calculation. - Other functions will

Posted by zu...@apache.org.
- Supported ROWS mode for AVG window aggregation.
- Created WindowAggregateFunctions in expressions/window_aggregation.
- Created WindowAggregationHandle for AVG to actually do the calculation.
- Other functions will be supported in future PRs.
- RANGE mode is not supported yet.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c0bb4620
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c0bb4620
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c0bb4620

Branch: refs/heads/SQL-window-aggregation
Commit: c0bb4620ca7d3e2ba6334efc009376164d2cd6f8
Parents: 2b78380
Author: shixuan-fan <sh...@apache.org>
Authored: Fri Jul 8 18:23:47 2016 +0000
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Jul 25 13:21:40 2016 -0700

----------------------------------------------------------------------
 expressions/CMakeLists.txt                      |   1 +
 expressions/window_aggregation/CMakeLists.txt   | 206 ++++++++++
 .../WindowAggregateFunction.cpp                 |  58 +++
 .../WindowAggregateFunction.hpp                 | 149 +++++++
 .../WindowAggregateFunction.proto               |  32 ++
 .../WindowAggregateFunctionAvg.cpp              |  85 ++++
 .../WindowAggregateFunctionAvg.hpp              |  75 ++++
 .../WindowAggregateFunctionCount.cpp            |  59 +++
 .../WindowAggregateFunctionCount.hpp            |  75 ++++
 .../WindowAggregateFunctionFactory.cpp          | 106 +++++
 .../WindowAggregateFunctionFactory.hpp          |  96 +++++
 .../WindowAggregateFunctionMax.cpp              |  67 ++++
 .../WindowAggregateFunctionMax.hpp              |  75 ++++
 .../WindowAggregateFunctionMin.cpp              |  66 ++++
 .../WindowAggregateFunctionMin.hpp              |  75 ++++
 .../WindowAggregateFunctionSum.cpp              |  82 ++++
 .../WindowAggregateFunctionSum.hpp              |  75 ++++
 .../WindowAggregationHandle.hpp                 | 137 +++++++
 .../WindowAggregationHandleAvg.cpp              | 241 ++++++++++++
 .../WindowAggregationHandleAvg.hpp              | 101 +++++
 .../window_aggregation/WindowAggregationID.hpp  |  44 +++
 .../WindowAggregationHandleAvg_unittest.cpp     | 387 +++++++++++++++++++
 query_optimizer/CMakeLists.txt                  |   2 +
 query_optimizer/ExecutionGenerator.cpp          |   5 +-
 query_optimizer/expressions/CMakeLists.txt      |   2 +-
 .../expressions/WindowAggregateFunction.cpp     |   4 +-
 .../expressions/WindowAggregateFunction.hpp     |  10 +-
 query_optimizer/resolver/CMakeLists.txt         |   2 +
 query_optimizer/resolver/Resolver.cpp           |  40 +-
 query_optimizer/resolver/Resolver.hpp           |   5 +-
 .../tests/execution_generator/Select.test       |  78 +++-
 relational_operators/CMakeLists.txt             |   1 +
 .../WindowAggregationOperator.cpp               |  15 +-
 .../WindowAggregationOperator.hpp               |  30 +-
 relational_operators/WorkOrder.proto            |   3 +-
 storage/CMakeLists.txt                          |  17 +-
 storage/WindowAggregationOperationState.cpp     | 183 +++++++--
 storage/WindowAggregationOperationState.hpp     |  68 +---
 storage/WindowAggregationOperationState.proto   |   6 +-
 ...WindowAggregationOperationState_unittest.cpp |  14 +-
 40 files changed, 2613 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/CMakeLists.txt b/expressions/CMakeLists.txt
index 53ad5d4..6ef3c24 100644
--- a/expressions/CMakeLists.txt
+++ b/expressions/CMakeLists.txt
@@ -17,6 +17,7 @@ add_subdirectory(aggregation)
 add_subdirectory(predicate)
 add_subdirectory(scalar)
 add_subdirectory(table_generator)
+add_subdirectory(window_aggregation)
 
 QS_PROTOBUF_GENERATE_CPP(expressions_Expressions_proto_srcs
                          expressions_Expressions_proto_hdrs

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt
new file mode 100644
index 0000000..6a16fcc
--- /dev/null
+++ b/expressions/window_aggregation/CMakeLists.txt
@@ -0,0 +1,206 @@
+#   Licensed to the Apache Software Foundation (ASF) under one
+#   or more contributor license agreements.  See the NOTICE file
+#   distributed with this work for additional information
+#   regarding copyright ownership.  The ASF licenses this file
+#   to you under the Apache License, Version 2.0 (the
+#   "License"); you may not use this file except in compliance
+#   with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+#   Unless required by applicable law or agreed to in writing,
+#   software distributed under the License is distributed on an
+#   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#   KIND, either express or implied.  See the License for the
+#   specific language governing permissions and limitations
+#   under the License.
+
+QS_PROTOBUF_GENERATE_CPP(expressions_windowaggregation_WindowAggregateFunction_proto_srcs
+                         expressions_windowaggregation_WindowAggregateFunction_proto_hdrs
+                         WindowAggregateFunction.proto)
+
+# Declare micro-libs:
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction
+            WindowAggregateFunction.cpp
+            WindowAggregateFunction.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+            ${expressions_windowaggregation_WindowAggregateFunction_proto_srcs})
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+            WindowAggregateFunctionAvg.cpp
+            WindowAggregateFunctionAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionCount
+            WindowAggregateFunctionCount.cpp
+            WindowAggregateFunctionCount.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+            WindowAggregateFunctionFactory.cpp
+            WindowAggregateFunctionFactory.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionMax
+            WindowAggregateFunctionMax.cpp
+            WindowAggregateFunctionMax.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionMin
+            WindowAggregateFunctionMin.cpp
+            WindowAggregateFunctionMin.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
+            WindowAggregateFunctionSum.cpp
+            WindowAggregateFunctionSum.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle
+            ../../empty_src.cpp
+            WindowAggregationHandle.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+            WindowAggregationHandleAvg.cpp
+            WindowAggregationHandleAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationID
+            ../../empty_src.cpp
+            WindowAggregationID.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+                      ${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+                      glog
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_types_Type
+                      quickstep_types_TypeFactory
+                      quickstep_types_TypeID
+                      quickstep_types_operations_binaryoperations_BinaryOperation
+                      quickstep_types_operations_binaryoperations_BinaryOperationFactory
+                      quickstep_types_operations_binaryoperations_BinaryOperationID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionCount
+                      glog
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_types_Type
+                      quickstep_types_TypeFactory
+                      quickstep_types_TypeID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+                      glog
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionCount
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionMax
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionMin
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionMax
+                      glog
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_types_Type
+                      quickstep_types_operations_comparisons_Comparison
+                      quickstep_types_operations_comparisons_ComparisonFactory
+                      quickstep_types_operations_comparisons_ComparisonID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionMin
+                      glog
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_types_Type
+                      quickstep_types_operations_comparisons_Comparison
+                      quickstep_types_operations_comparisons_ComparisonFactory
+                      quickstep_types_operations_comparisons_ComparisonID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
+                      glog
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_types_Type
+                      quickstep_types_TypeFactory
+                      quickstep_types_TypeID
+                      quickstep_types_operations_binaryoperations_BinaryOperation
+                      quickstep_types_operations_binaryoperations_BinaryOperationFactory
+                      quickstep_types_operations_binaryoperations_BinaryOperationID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      glog
+                      quickstep_catalog_CatalogRelationSchema
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_types_operations_comparisons_Comparison
+                      quickstep_types_operations_comparisons_ComparisonFactory
+                      quickstep_types_operations_comparisons_ComparisonID
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_scalar_Scalar
+                      quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_storage_ValueAccessor
+                      quickstep_types_Type
+                      quickstep_types_TypeFactory
+                      quickstep_types_TypeID
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_types_operations_binaryoperations_BinaryOperation
+                      quickstep_types_operations_binaryoperations_BinaryOperationFactory
+                      quickstep_types_operations_binaryoperations_BinaryOperationID
+                      quickstep_types_operations_comparisons_Comparison
+                      quickstep_utility_Macros)
+
+# Submodule all-in-one library:
+add_library(quickstep_expressions_windowaggregation ../../empty_src.cpp)
+target_link_libraries(quickstep_expressions_windowaggregation
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionCount
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionMax
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionMin
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+                      quickstep_expressions_windowaggregation_WindowAggregationID)
+
+# Tests:
+
+# Unified executable to ammortize cost of linking.
+add_executable(WindowAggregationHandle_tests
+               "${CMAKE_CURRENT_SOURCE_DIR}/tests/WindowAggregationHandleAvg_unittest.cpp")
+target_link_libraries(WindowAggregationHandle_tests
+                      gtest
+                      gtest_main
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_windowaggregation_WindowAggregateFunction
+                      quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+                      quickstep_expressions_windowaggregation_WindowAggregationHandle
+                      quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+                      quickstep_expressions_windowaggregation_WindowAggregationID
+                      quickstep_storage_ValueAccessor
+                      quickstep_types_CharType
+                      quickstep_types_DateOperatorOverloads
+                      quickstep_types_DatetimeIntervalType
+                      quickstep_types_DatetimeType
+                      quickstep_types_DoubleType
+                      quickstep_types_FloatType
+                      quickstep_types_IntType
+                      quickstep_types_IntervalLit
+                      quickstep_types_LongType
+                      quickstep_types_Type
+                      quickstep_types_TypeFactory
+                      quickstep_types_TypeID
+                      quickstep_types_TypedValue
+                      quickstep_types_VarCharType
+                      quickstep_types_YearMonthIntervalType
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor)
+add_test(WindowAggregationHandle_tests WindowAggregationHandle_tests)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.cpp b/expressions/window_aggregation/WindowAggregateFunction.cpp
new file mode 100644
index 0000000..3911e1c
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.cpp
@@ -0,0 +1,58 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+serialization::WindowAggregateFunction WindowAggregateFunction::getProto() const {
+  serialization::WindowAggregateFunction proto;
+  switch (win_agg_id_) {
+    case WindowAggregationID::kAvg:
+      proto.set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
+      break;
+    case WindowAggregationID::kCount:
+      proto.set_window_aggregation_id(serialization::WindowAggregateFunction::COUNT);
+      break;
+    case WindowAggregationID::kMax:
+      proto.set_window_aggregation_id(serialization::WindowAggregateFunction::MAX);
+      break;
+    case WindowAggregationID::kMin:
+      proto.set_window_aggregation_id(serialization::WindowAggregateFunction::MIN);
+      break;
+    case WindowAggregationID::kSum:
+      proto.set_window_aggregation_id(serialization::WindowAggregateFunction::SUM);
+      break;
+    default: {
+      LOG(FATAL) << "Unrecognized WindowAggregationID: "
+                 << static_cast<std::underlying_type<WindowAggregationID>::type>(win_agg_id_);
+    }
+  }
+
+  return proto;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
new file mode 100644
index 0000000..e40479b
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -0,0 +1,149 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class CatalogRelationSchema;
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief A class representing a particular window aggregate function in the
+ *        abstract sense. Each named aggregate function is represented by a
+ *        singleton subclass of WindowAggregateFunction.
+ *
+ * WindowAggregateFunction provides informational methods about the
+ * applicability of a particular window aggregate function to particular
+ * argument Type(s). The actual implementation of the window aggregate
+ * functions' logic is in the WindowAggregationHandle class hierarchy, and can
+ * be different depending on the particular argument Type(s) given to the window
+ * aggregate. To perform a window aggregation, a caller should first call
+ * WindowAggregateFunction::createHandle() to instantiate an
+ * WindowAggregationHandle object, then use the methods of
+ * WindowAggregationHandle to do the actual window aggregation. Finally, delete
+ * the WindowAggregationHandle after finished.
+ * 
+ * See WindowAggregationHandle for more detailed information about how
+ * window aggregates are actually computed.
+ **/
+class WindowAggregateFunction {
+ public:
+  /**
+   * @brief Get the ID of this window aggregate (i.e. its unique ID amongst all
+   *        the WindowAggregateFunctions).
+   *
+   * @return The WindowAggregationID of this WindowAggregateFunction.
+   **/
+  inline WindowAggregationID getWindowAggregationID() const {
+    return win_agg_id_;
+  }
+
+  /**
+   * @brief Get the human-readable name of this WindowAggregateFunction.
+   *
+   * @return The human-readable name of this WindowAggregateFunction.
+   **/
+  virtual std::string getName() const = 0;
+
+  /**
+   * @brief Get the serialized protocol buffer representation of this
+   *        WindowAggregateFunction.
+   *
+   * @return A serialized protocol buffer representation of this
+   *         WindowAggregateFunction.
+   **/
+  virtual serialization::WindowAggregateFunction getProto() const;
+
+  /**
+   * @brief Determine if this WindowAggregateFunction can be applied to
+   *        arguments of particular Type(s).
+   *
+   * @param argument_types A list of zero or more Types (in order) for
+   *        arguments to this WindowAggregateFunction.
+   * @return Whether this WindowAggregateFunction is applicable to the given
+   *         argument_types.
+   **/
+  virtual bool canApplyToTypes(
+      const std::vector<const Type*> &argument_types) const = 0;
+
+  /**
+   * @brief Determine the result Type for this WindowAggregateFunction given
+   *        arguments of particular Type(s).
+   *
+   * @param argument_types A list of zero or more Types (in order) for
+   *        arguments to this WindowAggregateFunction.
+   * @return The result Type for this WindowAggregateFunction applied to the
+   *         specified argument_types, or nullptr if this
+   *         WindowAggregateFunction is not applicable to the specified Type(s).
+   **/
+  virtual const Type* resultTypeForArgumentTypes(
+      const std::vector<const Type*> &argument_types) const = 0;
+
+  /**
+   * @brief Create a WindowAggregationHandle to compute aggregates.
+   *
+   * @warning It is an error to call this method for argument_types which this
+   *          WindowAggregateFunction can not apply to. For safety, check
+   *          canApplyToTypes() first.
+   *
+   * @param argument_types A list of zero or more Types (in order) for
+   *        arguments to this WindowAggregateFunction.
+   * @param partition_key_types A list or zero or more Types for partition keys
+   *                            to this WindowAggregateFunction.
+   * 
+   * @return A new WindowAggregationHandle that can be used to compute this
+   *         WindowAggregateFunction over the specified argument_types. Caller
+   *         is responsible for deleting the returned object.
+   **/
+  virtual WindowAggregationHandle* createHandle(
+      const std::vector<const Type*> &argument_types,
+      const std::vector<const Type*> &partition_key_types) const = 0;
+
+ protected:
+  explicit WindowAggregateFunction(const WindowAggregationID win_agg_id)
+      : win_agg_id_(win_agg_id) {
+  }
+
+ private:
+  const WindowAggregationID win_agg_id_;
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunction);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunction.proto
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.proto b/expressions/window_aggregation/WindowAggregateFunction.proto
new file mode 100644
index 0000000..fe8d799
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.proto
@@ -0,0 +1,32 @@
+//   Licensed to the Apache Software Foundation (ASF) under one
+//   or more contributor license agreements.  See the NOTICE file
+//   distributed with this work for additional information
+//   regarding copyright ownership.  The ASF licenses this file
+//   to you under the Apache License, Version 2.0 (the
+//   "License"); you may not use this file except in compliance
+//   with the License.  You may obtain a copy of the License at
+// 
+//     http://www.apache.org/licenses/LICENSE-2.0
+// 
+//   Unless required by applicable law or agreed to in writing,
+//   software distributed under the License is distributed on an
+//   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//   KIND, either express or implied.  See the License for the
+//   specific language governing permissions and limitations
+//   under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+message WindowAggregateFunction {
+  enum WindowAggregationID {
+    AVG = 0;
+    COUNT = 1;
+    MAX = 2;
+    MIN = 3;
+    SUM = 4;    
+  }
+
+  required WindowAggregationID window_aggregation_id = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
new file mode 100644
index 0000000..bc31a53
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -0,0 +1,85 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionAvg::canApplyToTypes(
+    const std::vector<const Type*> &argument_types) const {
+  // AVG is unary.
+  if (argument_types.size() != 1) {
+    return false;
+  }
+
+  // Argument must be addable and divisible.
+  return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+             .canApplyToTypes(*argument_types.front(), *argument_types.front()) &&
+         BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+             .canApplyToTypes(*argument_types.front(), TypeFactory::GetType(kDouble));
+}
+
+const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
+    const std::vector<const Type*> &argument_types) const {
+  if (!canApplyToTypes(argument_types)) {
+    return nullptr;
+  }
+
+  // The type used to sum values is nullable, and we automatically widen int to
+  // long and float to double to have more headroom when adding up many values.
+  const Type *sum_type = &(argument_types.front()->getNullableVersion());
+  switch (sum_type->getTypeID()) {
+    case kInt:
+      sum_type = &TypeFactory::GetType(kLong, true);
+      break;
+    case kFloat:
+      sum_type = &TypeFactory::GetType(kDouble, true);
+      break;
+    default:
+      break;
+  }
+
+  return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+             .resultTypeForArgumentTypes(*sum_type, TypeFactory::GetType(kDouble));
+}
+
+WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
+    const std::vector<const Type*> &argument_types,
+    const std::vector<const Type*> &partition_key_types) const {
+  DCHECK(canApplyToTypes(argument_types))
+      << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
+      << " that AVG can not be applied to.";
+
+  return new WindowAggregationHandleAvg(partition_key_types,
+                                        *argument_types.front());
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
new file mode 100644
index 0000000..32fd9d5
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -0,0 +1,75 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL AVG() OVER term.
+ **/
+class WindowAggregateFunctionAvg : public WindowAggregateFunction {
+ public:
+  static const WindowAggregateFunctionAvg& Instance() {
+    static WindowAggregateFunctionAvg instance;
+    return instance;
+  }
+
+  std::string getName() const override {
+    return "AVG";
+  }
+
+  bool canApplyToTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  const Type* resultTypeForArgumentTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  WindowAggregationHandle* createHandle(
+      const std::vector<const Type*> &argument_types,
+      const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+  WindowAggregateFunctionAvg()
+      : WindowAggregateFunction(WindowAggregationID::kAvg) {
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionAvg);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
new file mode 100644
index 0000000..504e000
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
@@ -0,0 +1,59 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionCount.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionCount::canApplyToTypes(
+    const std::vector<const Type*> &argument_types) const {
+  // COUNT may be nullary (i.e. COUNT(*)) or unary.
+  return argument_types.size() <= 1;
+}
+
+const Type* WindowAggregateFunctionCount::resultTypeForArgumentTypes(
+    const std::vector<const Type*> &argument_types) const {
+  if (!canApplyToTypes(argument_types)) {
+    return nullptr;
+  }
+
+  return &TypeFactory::GetType(kLong);
+}
+
+WindowAggregationHandle* WindowAggregateFunctionCount::createHandle(
+    const std::vector<const Type*> &argument_types,
+    const std::vector<const Type*> &partition_key_types) const {
+  DCHECK(canApplyToTypes(argument_types))
+      << "Attempted to create a WindowAggregationHandleCount for argument Types "
+      << "that COUNT can not be applied to (> 1 argument).";
+
+  // TODO(Shixuan): Add handle for Count.
+  return nullptr;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
new file mode 100644
index 0000000..1b40fdd
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
@@ -0,0 +1,75 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_COUNT_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_COUNT_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL COUNT() OVER term.
+ **/
+class WindowAggregateFunctionCount : public WindowAggregateFunction {
+ public:
+  static const WindowAggregateFunctionCount& Instance() {
+    static WindowAggregateFunctionCount instance;
+    return instance;
+  }
+
+  std::string getName() const override {
+    return "COUNT";
+  }
+
+  bool canApplyToTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  const Type* resultTypeForArgumentTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  WindowAggregationHandle* createHandle(
+      const std::vector<const Type*> &argument_types,
+      const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+  WindowAggregateFunctionCount()
+      : WindowAggregateFunction(WindowAggregationID::kCount) {
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionCount);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_COUNT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
new file mode 100644
index 0000000..65247f2
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
@@ -0,0 +1,106 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+
+#include <string>
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionCount.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionMax.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionMin.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionSum.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::Get(
+    const WindowAggregationID agg_id) {
+  switch (agg_id) {
+    case WindowAggregationID::kAvg:
+      return WindowAggregateFunctionAvg::Instance();
+    case WindowAggregationID::kCount:
+      return WindowAggregateFunctionCount::Instance();
+    case WindowAggregationID::kMax:
+      return WindowAggregateFunctionMax::Instance();
+    case WindowAggregationID::kMin:
+      return WindowAggregateFunctionMin::Instance();
+    case WindowAggregationID::kSum:
+      return WindowAggregateFunctionSum::Instance();
+    default: {
+      LOG(FATAL) << "Unrecognized WindowAggregationID: "
+                 << static_cast<std::underlying_type<WindowAggregationID>::type>(agg_id);
+    }
+  }
+}
+
+const WindowAggregateFunction* WindowAggregateFunctionFactory::GetByName(
+    const std::string &name) {
+  if (name == "avg") {
+    return &WindowAggregateFunctionAvg::Instance();
+  } else if (name == "count") {
+    return &WindowAggregateFunctionCount::Instance();
+  } else if (name == "max") {
+    return &WindowAggregateFunctionMax::Instance();
+  } else if (name == "min") {
+    return &WindowAggregateFunctionMin::Instance();
+  } else if (name == "sum") {
+    return &WindowAggregateFunctionSum::Instance();
+  } else {
+    return nullptr;
+  }
+}
+
+bool WindowAggregateFunctionFactory::ProtoIsValid(
+    const serialization::WindowAggregateFunction &proto) {
+  return proto.IsInitialized() &&
+         serialization::WindowAggregateFunction::WindowAggregationID_IsValid(proto.window_aggregation_id());
+}
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::ReconstructFromProto(
+    const serialization::WindowAggregateFunction &proto) {
+  DCHECK(ProtoIsValid(proto))
+      << "Attempted to reconstruct an WindowAggregateFunction from an invalid proto:\n"
+      << proto.DebugString();
+
+  switch (proto.window_aggregation_id()) {
+    case serialization::WindowAggregateFunction::AVG:
+      return WindowAggregateFunctionAvg::Instance();
+    case serialization::WindowAggregateFunction::COUNT:
+      return WindowAggregateFunctionCount::Instance();
+    case serialization::WindowAggregateFunction::MAX:
+      return WindowAggregateFunctionMax::Instance();
+    case serialization::WindowAggregateFunction::MIN:
+      return WindowAggregateFunctionMin::Instance();
+    case serialization::WindowAggregateFunction::SUM:
+      return WindowAggregateFunctionSum::Instance();
+    default: {
+      LOG(FATAL) << "Unrecognized serialization::WindowAggregateFunction::WindowAggregationID: "
+                 << proto.window_aggregation_id()
+                 << "\nFull proto debug string:\n"
+                 << proto.DebugString();
+    }
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
new file mode 100644
index 0000000..1d59e93
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
@@ -0,0 +1,96 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+
+#include <string>
+
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregateFunction;
+namespace serialization { class WindowAggregateFunction; }
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief All-static factory with methods that provide access to the various
+ *        concrete implementations of WindowAggregateFunction.
+ *
+ * WindowAggregateFunctionFactory allows client code to use any
+ * WindowAggregateFunction in Quickstep in a generic way without having to know
+ * about all the specific subclasses of WindowAggregateFunction. In particular,
+ * it is used to deserialize WindowAggregateFunctions used in
+ * WindowAggregationOperationState from their protobuf representations
+ * (originally created by the optimizer) when deserializing a QueryContext.
+ **/
+namespace WindowAggregateFunctionFactory {
+  /**
+   * @brief Get a particular WindowAggregateFunction by its ID.
+   *
+   * @param agg_id The ID of the desired WindowAggregateFunction.
+   * @return A reference to the singleton instance of the
+   *         WindowAggregateFunction specified by agg_id.
+   **/
+  const WindowAggregateFunction& Get(const WindowAggregationID agg_id);
+
+  /**
+   * @brief Get a particular WindowAggregateFunction by its name in SQL syntax.
+   *
+   * @param name The name of the desired WindowAggregateFunction in lower case.
+   * @return A pointer to the WindowAggregateFunction specified by name, or NULL
+   *         if name does not match any known WindowAggregateFunction.
+   **/
+  const WindowAggregateFunction* GetByName(const std::string &name);
+
+  /**
+   * @brief Determine if a serialized protobuf representation of a
+   *        WindowAggregateFunction is fully-formed and valid.
+   *
+   * @param proto A serialized protobuf representation of a
+   *              WindowAggregateFunction to check for validity.
+   * @return Whether proto is fully-formed and valid.
+   **/
+  bool ProtoIsValid(const serialization::WindowAggregateFunction &proto);
+
+  /**
+   * @brief Get the WindowAggregateFunction represented by a proto.
+   *
+   * @warning It is an error to call this method with an invalid proto.
+   *          ProtoIsValid() should be called first to check.
+   *
+   * @param proto A serialized protobuf representation of a
+   *              WindowAggregateFunction.
+   * @return The WindowAggregateFunction represented by proto.
+   **/
+  const WindowAggregateFunction& ReconstructFromProto(
+      const serialization::WindowAggregateFunction &proto);
+
+}  // namespace WindowAggregateFunctionFactory
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
new file mode 100644
index 0000000..f3997c7
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
@@ -0,0 +1,67 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionMax.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonFactory.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionMax::canApplyToTypes(
+    const std::vector<const Type*> &argument_types) const {
+  // MAX is unary.
+  if (argument_types.size() != 1) {
+    return false;
+  }
+
+  // Argument must be comparable by '>'.
+  return ComparisonFactory::GetComparison(ComparisonID::kGreater).canCompareTypes(
+      *argument_types.front(),
+      *argument_types.front());
+}
+
+const Type* WindowAggregateFunctionMax::resultTypeForArgumentTypes(
+    const std::vector<const Type*> &argument_types) const {
+  if (!canApplyToTypes(argument_types)) {
+    return nullptr;
+  }
+
+  return &(argument_types.front()->getNullableVersion());
+}
+
+WindowAggregationHandle* WindowAggregateFunctionMax::createHandle(
+    const std::vector<const Type*> &argument_types,
+    const std::vector<const Type*> &partition_key_types) const {
+  DCHECK(canApplyToTypes(argument_types))
+      << "Attempted to create a WindowAggregationHandleMax for argument Type(s) "
+      << "that MAX can not be applied to.";
+
+  // TODO(Shixuan): Add handle for Max.
+  return nullptr;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
new file mode 100644
index 0000000..00c788e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
@@ -0,0 +1,75 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MAX_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MAX_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL MAX() OVER term.
+ **/
+class WindowAggregateFunctionMax : public WindowAggregateFunction {
+ public:
+  static const WindowAggregateFunctionMax& Instance() {
+    static WindowAggregateFunctionMax instance;
+    return instance;
+  }
+
+  std::string getName() const override {
+    return "MAX";
+  }
+
+  bool canApplyToTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  const Type* resultTypeForArgumentTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  WindowAggregationHandle* createHandle(
+      const std::vector<const Type*> &argument_types,
+      const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+  WindowAggregateFunctionMax()
+      : WindowAggregateFunction(WindowAggregationID::kMax) {
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionMax);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MAX_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
new file mode 100644
index 0000000..a13e28e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
@@ -0,0 +1,66 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionMin.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonFactory.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionMin::canApplyToTypes(
+    const std::vector<const Type*> &argument_types) const {
+  // MIN is unary.
+  if (argument_types.size() != 1) {
+    return false;
+  }
+
+  // Argument must be comparable by '<'.
+  return ComparisonFactory::GetComparison(ComparisonID::kLess).canCompareTypes(
+      *argument_types.front(),
+      *argument_types.front());
+}
+
+const Type* WindowAggregateFunctionMin::resultTypeForArgumentTypes(
+    const std::vector<const Type*> &argument_types) const {
+  if (!canApplyToTypes(argument_types)) {
+    return nullptr;
+  }
+
+  return &(argument_types.front()->getNullableVersion());
+}
+
+WindowAggregationHandle* WindowAggregateFunctionMin::createHandle(
+    const std::vector<const Type*> &argument_types,
+    const std::vector<const Type*> &partition_key_types) const {
+  DCHECK(canApplyToTypes(argument_types))
+      << "Attempted to create a WindowAggregationHandleMin for argument Type(s) "
+      << "that MIN can not be applied to.";
+
+  return nullptr;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
new file mode 100644
index 0000000..aeba539
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
@@ -0,0 +1,75 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MIN_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MIN_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL MIN() OVER term.
+ **/
+class WindowAggregateFunctionMin : public WindowAggregateFunction {
+ public:
+  static const WindowAggregateFunctionMin& Instance() {
+    static WindowAggregateFunctionMin instance;
+    return instance;
+  }
+
+  std::string getName() const override {
+    return "MIN";
+  }
+
+  bool canApplyToTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  const Type* resultTypeForArgumentTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  WindowAggregationHandle* createHandle(
+      const std::vector<const Type*> &argument_types,
+      const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+  WindowAggregateFunctionMin()
+      : WindowAggregateFunction(WindowAggregationID::kMin) {
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionMin);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MIN_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
new file mode 100644
index 0000000..636c53a
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
@@ -0,0 +1,82 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionSum.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionSum::canApplyToTypes(
+    const std::vector<const Type*> &argument_types) const {
+  // SUM is unary.
+  if (argument_types.size() != 1) {
+    return false;
+  }
+
+  // Argument must be addable.
+  return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+             .canApplyToTypes(*argument_types.front(), *argument_types.front());
+}
+
+const Type* WindowAggregateFunctionSum::resultTypeForArgumentTypes(
+    const std::vector<const Type*> &argument_types) const {
+  if (!canApplyToTypes(argument_types)) {
+    return nullptr;
+  }
+
+  // SUM may return NULL if there are no input rows, and we automatically widen
+  // int to long and float to double to have more headroom when adding up many
+  // values.
+  const Type *sum_type = &(argument_types.front()->getNullableVersion());
+  switch (sum_type->getTypeID()) {
+    case kInt:
+      sum_type = &TypeFactory::GetType(kLong, true);
+      break;
+    case kFloat:
+      sum_type = &TypeFactory::GetType(kDouble, true);
+      break;
+    default:
+      break;
+  }
+
+  return sum_type;
+}
+
+WindowAggregationHandle* WindowAggregateFunctionSum::createHandle(
+    const std::vector<const Type*> &argument_types,
+    const std::vector<const Type*> &partition_key_types) const {
+  DCHECK(canApplyToTypes(argument_types))
+      << "Attempted to create a WindowAggregationHandleSum for argument Type(s) "
+      << "that SUM can not be applied to.";
+
+  return nullptr;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
new file mode 100644
index 0000000..047113c
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
@@ -0,0 +1,75 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_SUM_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_SUM_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL SUM() OVER term.
+ **/
+class WindowAggregateFunctionSum : public WindowAggregateFunction {
+ public:
+  static const WindowAggregateFunctionSum& Instance() {
+    static WindowAggregateFunctionSum instance;
+    return instance;
+  }
+
+  std::string getName() const override {
+    return "SUM";
+  }
+
+  bool canApplyToTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  const Type* resultTypeForArgumentTypes(
+      const std::vector<const Type*> &argument_types) const override;
+
+  WindowAggregationHandle* createHandle(
+      const std::vector<const Type*> &argument_types,
+      const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+  WindowAggregateFunctionSum()
+      : WindowAggregateFunction(WindowAggregationID::kSum) {
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionSum);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_SUM_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
new file mode 100644
index 0000000..65f95d9
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -0,0 +1,137 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonFactory.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class InsertDestinationInterface;
+class Scalar;
+class StorageManager;
+class Type;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief WindowAggregationHandle encapsulates logic for actually computing
+ *        window aggregates with particular argument(s).
+ * @note See also WindowAggregateFunction, which represents a SQL aggregate
+ *       function in the abstract sense.
+ *
+ * A WindowAggregationHandle is created by calling
+ * WindowAggregateFunction::createHandle(). The WindowAggregationHandle object
+ * provides methods that are used to actually compute the window aggregate,
+ * storing intermediate results in WindowAggregationState objects.
+ *
+ * The work flow for computing a window aggregate is:
+ *     1. Create an initial state by createInitialState().
+ *     2. One thread will handle all the computation, iterating from the first
+ *        tuple to the last tuple. Note there will be two modes that could be
+ *        used upon different situations:
+ *        a. If the window aggregate is defined as accumulative, which are:
+ *           i.  Functions applied to whole partition, such as rank(), ntile()
+ *               and dense_rank().
+ *           ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING
+ *               AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED
+ *               FOLLOWING".
+ *           Then, for functions except median, we could store some global
+ *           values in the state without keeping all the tuple values around.
+ *        b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND
+ *           3 FOLLOWING", we have to store all the tuples in the state so that
+ *           we could know which values should be dropped as the window slides.
+ *        For each computed value, generate a tuple store in the column vector.
+ *     3. Insert the new column into the original relation and return.
+ *
+ * TODO(Shixuan): Currently we don't support parallelization. The basic idea for
+ * parallelization is to calculate the partial result inside each block. Each
+ * block could visit the following blocks as long as the block's last partition
+ * is not finished. WindowAggregationOperationState will be used for handling
+ * the global state of the calculation.
+ **/
+
+class WindowAggregationHandle {
+ public:
+  /**
+   * @brief Destructor.
+   **/
+  virtual ~WindowAggregationHandle() {}
+
+  /**
+   * @brief Calculate the window aggregate result.
+   *
+   * @param block_accessors A pointer to the value accessor of block attributes.
+   * @param arguments The ColumnVectors of arguments
+   * @param partition_by_ids The ids of partition keys.
+   * @param is_row True if the frame mode is ROWS, false if it is RANGE.
+   * @param num_preceding The number of rows/range that precedes the current row.
+   * @param num_following The number of rows/range that follows the current row.
+   *
+   * @return A ColumnVector of the calculated window aggregates.
+   **/
+  virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
+                                  std::vector<ColumnVector*> &&arguments,
+                                  const std::vector<attribute_id> &partition_by_ids,
+                                  const bool is_row,
+                                  const std::int64_t num_preceding,
+                                  const std::int64_t num_following) const = 0;
+
+ protected:
+  /**
+   * @brief Constructor.
+   *
+   * @param partition_key_types The Types of the partition key.
+   **/
+  explicit WindowAggregationHandle(
+      const std::vector<const Type*> &partition_key_types) {
+    // Comparison operators for checking if two tuples belong to the same partition.
+    for (const Type *partition_key_type : partition_key_types) {
+      equal_comparators_.emplace_back(
+          ComparisonFactory::GetComparison(ComparisonID::kEqual)
+              .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
+    }
+  }
+
+  std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
new file mode 100644
index 0000000..a6a10d4
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -0,0 +1,241 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationHandleAvg::WindowAggregationHandleAvg(
+    const std::vector<const Type*> &partition_key_types,
+    const Type &type)
+    : WindowAggregationHandle(partition_key_types),
+      argument_type_(type) {
+  // We sum Int as Long and Float as Double so that we have more headroom when
+  // adding many values.
+  TypeID type_id;
+  switch (type.getTypeID()) {
+    case kInt:
+    case kLong:
+      type_id = kLong;
+      break;
+    case kFloat:
+    case kDouble:
+      type_id = kDouble;
+      break;
+    default:
+      type_id = type.getTypeID();
+      break;
+  }
+
+  sum_type_ = &(TypeFactory::GetType(type_id));
+
+  // Result is nullable, because AVG() over 0 values (or all NULL values) is
+  // NULL.
+  result_type_
+      = &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+              .resultTypeForArgumentTypes(*sum_type_, TypeFactory::GetType(kDouble))
+                  ->getNullableVersion());
+
+  // Make operators to do arithmetic:
+  // Add operator for summing argument values.
+  fast_add_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+          .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+  // Divide operator for dividing sum by count to get final average.
+  divide_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+          .makeUncheckedBinaryOperatorForTypes(*sum_type_, TypeFactory::GetType(kDouble)));
+}
+
+ColumnVector* WindowAggregationHandleAvg::calculate(
+    ColumnVectorsValueAccessor *tuple_accessor,
+    std::vector<ColumnVector*> &&arguments,
+    const std::vector<attribute_id> &partition_by_ids,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) const {
+  DCHECK_EQ(1u, arguments.size());
+  DCHECK(arguments[0]->isNative());
+  DCHECK_EQ(static_cast<std::size_t>(tuple_accessor->getNumTuples()),
+            static_cast<const NativeColumnVector*>(arguments[0])->size());
+
+  // Initialize the output column and argument accessor.
+  NativeColumnVector *window_aggregates =
+      new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples());
+  ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor();
+  argument_accessor->addColumn(arguments[0]);
+
+  // Create a window for each tuple and calculate the window aggregate.
+  tuple_accessor->beginIteration();
+  argument_accessor->beginIteration();
+
+  while (tuple_accessor->next() && argument_accessor->next()) {
+    const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor,
+                                                                 argument_accessor,
+                                                                 partition_by_ids,
+                                                                 is_row,
+                                                                 num_preceding,
+                                                                 num_following);
+    window_aggregates->appendTypedValue(window_aggregate);
+  }
+
+  return window_aggregates;
+}
+
+TypedValue WindowAggregationHandleAvg::calculateOneWindow(
+    ColumnVectorsValueAccessor *tuple_accessor,
+    ColumnVectorsValueAccessor *argument_accessor,
+    const std::vector<attribute_id> &partition_by_ids,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) const {
+  // Initialize.
+  TypedValue sum = sum_type_->makeZeroValue();
+  TypedValue current_value = argument_accessor->getTypedValue(0);
+  std::uint64_t count = 0;
+
+  // Ignore the value if null.
+  if (!current_value.isNull()) {
+    sum = fast_add_operator_->applyToTypedValues(sum, current_value);
+    count++;
+  }
+
+  // Get the partition key for the current row.
+  std::vector<TypedValue> current_row_partition_key;
+  for (attribute_id partition_by_id : partition_by_ids) {
+    current_row_partition_key.push_back(
+        tuple_accessor->getTypedValue(partition_by_id));
+  }
+
+  // Get current position.
+  tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
+
+  // Find preceding tuples.
+  int count_preceding = 0;
+  tuple_id preceding_tuple_id = current_tuple_id;
+  while (num_preceding == -1 || count_preceding < num_preceding) {
+    preceding_tuple_id--;
+
+    // No more preceding tuples.
+    if (preceding_tuple_id < 0) {
+      break;
+    }
+
+    // Get the partition keys and compare. If not the same partition as the
+    // current row, stop searching preceding tuples.
+    if (!samePartition(tuple_accessor,
+                       current_row_partition_key,
+                       preceding_tuple_id,
+                       partition_by_ids)) {
+      break;
+    }
+
+    // Actually count the element and do the calculation.
+    count_preceding++;
+    TypedValue preceding_value =
+        argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id);
+
+    // Ignore the value if null.
+    if (!preceding_value.isNull()) {
+      sum = fast_add_operator_->applyToTypedValues(sum, preceding_value);
+      count++;
+    }
+  }
+
+  // Find following tuples.
+  int count_following = 0;
+  tuple_id following_tuple_id = current_tuple_id;
+  while (num_following == -1 || count_following < num_following) {
+    following_tuple_id++;
+
+    // No more following tuples.
+    if (following_tuple_id == tuple_accessor->getNumTuples()) {
+      break;
+    }
+
+    // Get the partition keys and compare. If not the same partition as the
+    // current row, stop searching preceding tuples.
+    if (!samePartition(tuple_accessor,
+                       current_row_partition_key,
+                       following_tuple_id,
+                       partition_by_ids)) {
+      break;
+    }
+
+    // Actually count the element and do the calculation.
+    count_following++;
+    TypedValue following_value =
+        argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id);
+
+    // Ignore the value if null.
+    if (!following_value.isNull()) {
+      sum = fast_add_operator_->applyToTypedValues(sum, following_value);
+      count++;
+    }
+  }
+
+  // If all values are NULLs, return NULL; Otherwise, return the quotient.
+  if (count == 0) {
+    return result_type_->makeNullValue();
+  } else {
+    return divide_operator_->applyToTypedValues(sum,
+                                                TypedValue(static_cast<double>(count)));
+  }
+}
+
+bool WindowAggregationHandleAvg::samePartition(
+    const ColumnVectorsValueAccessor *tuple_accessor,
+    const std::vector<TypedValue> &current_row_partition_key,
+    const tuple_id boundary_tuple_id,
+    const std::vector<attribute_id> &partition_by_ids) const {
+  for (std::size_t partition_by_index = 0;
+       partition_by_index < partition_by_ids.size();
+       ++partition_by_index) {
+    if (!equal_comparators_[partition_by_index]->compareTypedValues(
+            current_row_partition_key[partition_by_index],
+            tuple_accessor->getTypedValueAtAbsolutePosition(
+                partition_by_ids[partition_by_index], boundary_tuple_id))) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
new file mode 100644
index 0000000..5b41779
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -0,0 +1,101 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <queue>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVector;
+class ColumnVectorsValueAccessor;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief A WindowAggregationHandle for average.
+ **/
+class WindowAggregationHandleAvg : public WindowAggregationHandle {
+ public:
+  ~WindowAggregationHandleAvg() override {}
+
+  ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
+                          std::vector<ColumnVector*> &&arguments,
+                          const std::vector<attribute_id> &partition_by_ids,
+                          const bool is_row,
+                          const std::int64_t num_preceding,
+                          const std::int64_t num_following) const override;
+
+ private:
+  friend class WindowAggregateFunctionAvg;
+
+  /**
+   * @brief Constructor.
+   *
+   * @param partition_key_types The Types of the partition key.
+   * @param type Type of the avg value.
+   **/
+  WindowAggregationHandleAvg(const std::vector<const Type*> &partition_key_types,
+                             const Type &type);
+
+  TypedValue calculateOneWindow(
+      ColumnVectorsValueAccessor *tuple_accessor,
+      ColumnVectorsValueAccessor *argument_accessor,
+      const std::vector<attribute_id> &partition_by_ids,
+      const bool is_row,
+      const std::int64_t num_preceding,
+      const std::int64_t num_following) const;
+
+  bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
+                     const std::vector<TypedValue> &current_row_partition_key,
+                     const tuple_id boundary_tuple_id,
+                     const std::vector<attribute_id> &partition_by_ids) const;
+
+  const Type &argument_type_;
+  const Type *sum_type_;
+  const Type *result_type_;
+  std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+  std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregationID.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationID.hpp b/expressions/window_aggregation/WindowAggregationID.hpp
new file mode 100644
index 0000000..8122df3
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationID.hpp
@@ -0,0 +1,44 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+
+namespace quickstep {
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief The possible types of window aggregations.
+ **/
+enum class WindowAggregationID {
+  kAvg,
+  kCount,
+  kMin,
+  kMax,
+  kSum
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_



[2/4] incubator-quickstep git commit: Introduced PolicyEnforcerBase and its single-node implementation.

Posted by zu...@apache.org.
Introduced PolicyEnforcerBase and its single-node implementation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2b78380e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2b78380e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2b78380e

Branch: refs/heads/SQL-window-aggregation
Commit: 2b78380ec5689658c909ba63241b78f72f4644bd
Parents: cdecf0b
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Jul 18 12:18:04 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Wed Jul 20 09:12:13 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt               |  23 +-
 query_execution/ForemanSingleNode.cpp        |   6 +-
 query_execution/ForemanSingleNode.hpp        |   7 +-
 query_execution/PolicyEnforcer.cpp           | 254 ----------------------
 query_execution/PolicyEnforcer.hpp           | 223 -------------------
 query_execution/PolicyEnforcerBase.cpp       | 177 +++++++++++++++
 query_execution/PolicyEnforcerBase.hpp       | 196 +++++++++++++++++
 query_execution/PolicyEnforcerSingleNode.cpp | 110 ++++++++++
 query_execution/PolicyEnforcerSingleNode.hpp | 104 +++++++++
 9 files changed, 612 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 8c12a5d..f582ba5 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -34,7 +34,8 @@ if (ENABLE_DISTRIBUTED)
 endif()
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
-add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
             ${queryexecution_QueryContext_proto_srcs}
@@ -85,7 +86,7 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanBase
-                      quickstep_queryexecution_PolicyEnforcer
+                      quickstep_queryexecution_PolicyEnforcerSingleNode
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_WorkerDirectory
@@ -95,22 +96,29 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       quickstep_utility_Macros
                       tmb
                       ${GFLAGS_LIB_NAME})
-target_link_libraries(quickstep_queryexecution_PolicyEnforcer
+target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
                       glog
                       quickstep_catalog_CatalogDatabase
                       quickstep_catalog_CatalogRelation
-                      quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_PartitionScheme
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionState
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryManagerBase
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_utility_Macros
+                      tmb)
+target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_PolicyEnforcerBase
+                      quickstep_queryexecution_QueryExecutionState
+                      quickstep_queryexecution_QueryManagerBase
                       quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryoptimizer_QueryHandle
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
                       tmb
                       ${GFLAGS_LIB_NAME})
@@ -244,7 +252,8 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanBase
                       quickstep_queryexecution_ForemanSingleNode
-                      quickstep_queryexecution_PolicyEnforcer
+                      quickstep_queryexecution_PolicyEnforcerBase
+                      quickstep_queryexecution_PolicyEnforcerSingleNode
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryexecution_QueryExecutionMessages_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 3aa1f0b..cda02a7 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/WorkerDirectory.hpp"
@@ -36,6 +37,7 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+#include "tmb/id_typedefs.h"
 #include "tmb/message_bus.h"
 #include "tmb/tagged_message.h"
 
@@ -46,6 +48,8 @@ using std::vector;
 
 namespace quickstep {
 
+class QueryHandle;
+
 DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
               "of pending work orders for the worker. This information is used "
               "by the Foreman to assign work orders to worker threads");
@@ -88,7 +92,7 @@ ForemanSingleNode::ForemanSingleNode(
     bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
   }
 
-  policy_enforcer_.reset(new PolicyEnforcer(
+  policy_enforcer_.reset(new PolicyEnforcerSingleNode(
       foreman_client_id_,
       num_numa_nodes,
       catalog_database_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
index 7506d35..caef5e0 100644
--- a/query_execution/ForemanSingleNode.hpp
+++ b/query_execution/ForemanSingleNode.hpp
@@ -24,11 +24,12 @@
 #include <vector>
 
 #include "query_execution/ForemanBase.hpp"
-#include "query_execution/PolicyEnforcer.hpp"
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
+
+namespace tmb { class MessageBus; }
 
 namespace quickstep {
 
@@ -128,7 +129,7 @@ class ForemanSingleNode final : public ForemanBase {
   CatalogDatabaseLite *catalog_database_;
   StorageManager *storage_manager_;
 
-  std::unique_ptr<PolicyEnforcer> policy_enforcer_;
+  std::unique_ptr<PolicyEnforcerSingleNode> policy_enforcer_;
 
   DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
deleted file mode 100644
index 4cba8c5..0000000
--- a/query_execution/PolicyEnforcer.cpp
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *     University of Wisconsin\u2014Madison.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#include "query_execution/PolicyEnforcer.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <utility>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "catalog/PartitionScheme.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/QueryManagerSingleNode.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-namespace quickstep {
-
-DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
-              " can be allocated in a single round of dispatch of messages to"
-              " the workers.");
-
-bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
-  if (admitted_queries_.size() < kMaxConcurrentQueries) {
-    // Ok to admit the query.
-    const std::size_t query_id = query_handle->query_id();
-    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
-      // Query with the same ID not present, ok to admit.
-      admitted_queries_[query_id].reset(
-          new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
-                                     catalog_database_, storage_manager_, bus_));
-      return true;
-    } else {
-      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
-      return false;
-    }
-  } else {
-    // This query will have to wait.
-    waiting_queries_.push(query_handle);
-    return false;
-  }
-}
-
-void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
-  std::size_t query_id;
-  QueryManagerBase::dag_node_index op_index;
-
-  switch (tagged_message.message_type()) {
-    case kWorkOrderCompleteMessage: {
-      serialization::NormalWorkOrderCompletionMessage proto;
-      // Note: This proto message contains the time it took to execute the
-      // WorkOrder. It can be accessed in this scope.
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      worker_directory_->decrementNumQueuedWorkOrders(
-          proto.worker_thread_index());
-      if (profile_individual_workorders_) {
-        recordTimeForWorkOrder(proto);
-      }
-
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-      admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
-      break;
-    }
-    case kRebuildWorkOrderCompleteMessage: {
-      serialization::RebuildWorkOrderCompletionMessage proto;
-      // Note: This proto message contains the time it took to execute the
-      // rebuild WorkOrder. It can be accessed in this scope.
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      worker_directory_->decrementNumQueuedWorkOrders(
-          proto.worker_thread_index());
-
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-      admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
-      break;
-    }
-    case kCatalogRelationNewBlockMessage: {
-      serialization::CatalogRelationNewBlockMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      const block_id block = proto.block_id();
-
-      CatalogRelation *relation =
-          static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
-      relation->addBlock(block);
-
-      if (proto.has_partition_id()) {
-        relation->getPartitionSchemeMutable()->addBlockToPartition(
-            proto.partition_id(), block);
-      }
-      return;
-    }
-    case kDataPipelineMessage: {
-      serialization::DataPipelineMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-      admitted_queries_[query_id]->processDataPipelineMessage(
-          op_index, proto.block_id(), proto.relation_id());
-      break;
-    }
-    case kWorkOrdersAvailableMessage: {
-      serialization::WorkOrdersAvailableMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-
-      // Check if new work orders are available.
-      admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
-      break;
-    }
-    case kWorkOrderFeedbackMessage: {
-      WorkOrder::FeedbackMessage msg(
-          const_cast<void *>(tagged_message.message()),
-          tagged_message.message_bytes());
-      query_id = msg.header().query_id;
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = msg.header().rel_op_index;
-      admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
-      break;
-    }
-    default:
-      LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
-  }
-  if (admitted_queries_[query_id]->queryStatus(op_index) ==
-          QueryManagerBase::QueryStatusCode::kQueryExecuted) {
-    removeQuery(query_id);
-    if (!waiting_queries_.empty()) {
-      // Admit the earliest waiting query.
-      QueryHandle *new_query = waiting_queries_.front();
-      waiting_queries_.pop();
-      admitQuery(new_query);
-    }
-  }
-}
-
-void PolicyEnforcer::getWorkerMessages(
-    std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
-  // Iterate over admitted queries until either there are no more
-  // messages available, or the maximum number of messages have
-  // been collected.
-  DCHECK(worker_messages->empty());
-  // TODO(harshad) - Make this function generic enough so that it
-  // works well when multiple queries are getting executed.
-  std::size_t per_query_share = 0;
-  if (!admitted_queries_.empty()) {
-    per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
-  } else {
-    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
-    return;
-  }
-  DCHECK_GT(per_query_share, 0u);
-  std::vector<std::size_t> finished_queries_ids;
-
-  for (const auto &admitted_query_info : admitted_queries_) {
-    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
-    DCHECK(curr_query_manager != nullptr);
-    std::size_t messages_collected_curr_query = 0;
-    while (messages_collected_curr_query < per_query_share) {
-      WorkerMessage *next_worker_message =
-          static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
-      if (next_worker_message != nullptr) {
-        ++messages_collected_curr_query;
-        worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
-      } else {
-        // No more work ordes from the current query at this time.
-        // Check if the query's execution is over.
-        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-          // If the query has been executed, remove it.
-          finished_queries_ids.push_back(admitted_query_info.first);
-        }
-        break;
-      }
-    }
-  }
-  for (const std::size_t finished_qid : finished_queries_ids) {
-    removeQuery(finished_qid);
-  }
-}
-
-void PolicyEnforcer::removeQuery(const std::size_t query_id) {
-  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-  if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
-    LOG(WARNING) << "Removing query with ID " << query_id
-                 << " that hasn't finished its execution";
-  }
-  admitted_queries_.erase(query_id);
-}
-
-bool PolicyEnforcer::admitQueries(
-    const std::vector<QueryHandle*> &query_handles) {
-  for (QueryHandle *curr_query : query_handles) {
-    if (!admitQuery(curr_query)) {
-      return false;
-    }
-  }
-  return true;
-}
-
-void PolicyEnforcer::recordTimeForWorkOrder(
-    const serialization::NormalWorkOrderCompletionMessage &proto) {
-  const std::size_t query_id = proto.query_id();
-  if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
-    workorder_time_recorder_[query_id];
-  }
-  workorder_time_recorder_[query_id].emplace_back(
-      proto.worker_thread_index(),
-      proto.operator_index(),
-      proto.execution_time_in_microseconds());
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
deleted file mode 100644
index 8bd6d92..0000000
--- a/query_execution/PolicyEnforcer.hpp
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *     University of Wisconsin\u2014Madison.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <tuple>
-#include <unordered_map>
-#include <vector>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class StorageManager;
-class WorkerDirectory;
-
-namespace serialization { class NormalWorkOrderCompletionMessage; }
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class that ensures that a high level policy is maintained
- *        in sharing resources among concurrent queries.
- **/
-class PolicyEnforcer {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param foreman_client_id The TMB client ID of the Foreman.
-   * @param num_numa_nodes Number of NUMA nodes used by the system.
-   * @param catalog_database The CatalogDatabase used.
-   * @param storage_manager The StorageManager used.
-   * @param bus The TMB.
-   **/
-  PolicyEnforcer(const tmb::client_id foreman_client_id,
-                 const std::size_t num_numa_nodes,
-                 CatalogDatabaseLite *catalog_database,
-                 StorageManager *storage_manager,
-                 WorkerDirectory *worker_directory,
-                 tmb::MessageBus *bus,
-                 const bool profile_individual_workorders = false)
-      : foreman_client_id_(foreman_client_id),
-        num_numa_nodes_(num_numa_nodes),
-        catalog_database_(catalog_database),
-        storage_manager_(storage_manager),
-        worker_directory_(worker_directory),
-        bus_(bus),
-        profile_individual_workorders_(profile_individual_workorders) {}
-
-  /**
-   * @brief Destructor.
-   **/
-  ~PolicyEnforcer() {
-    if (hasQueries()) {
-      LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
-                      "waiting queries";
-    }
-  }
-
-  /**
-   * @brief Admit a query to the system.
-   *
-   * @param query_handle The QueryHandle for the new query.
-   *
-   * @return Whether the query was admitted to the system.
-   **/
-  bool admitQuery(QueryHandle *query_handle);
-
-  /**
-   * @brief Admit multiple queries in the system.
-   *
-   * @note In the current simple implementation, we only allow one active
-   *       query in the system. Other queries will have to wait.
-   *
-   * @param query_handles A vector of QueryHandles for the queries to be
-   *        admitted.
-   *
-   * @return True if all the queries were admitted, false if at least one query
-   *         was not admitted.
-   **/
-  bool admitQueries(const std::vector<QueryHandle*> &query_handles);
-
-  /**
-   * @brief Remove a given query that is under execution.
-   *
-   * @note This function is made public so that it is possible for a query
-   *       to be killed. Otherwise, it should only be used privately by the
-   *       class.
-   *
-   * TODO(harshad) - Extend this function to support removal of waiting queries.
-   *
-   * @param query_id The ID of the query to be removed.
-   **/
-  void removeQuery(const std::size_t query_id);
-
-  /**
-   * @brief Get worker messages to be dispatched. These worker messages come
-   *        from the active queries.
-   *
-   * @param worker_messages The worker messages to be dispatched.
-   **/
-  void getWorkerMessages(
-      std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
-
-  /**
-   * @brief Process a message sent to the Foreman, which gets passed on to the
-   *        policy enforcer.
-   *
-   * @param message The message.
-   **/
-  void processMessage(const TaggedMessage &tagged_message);
-
-  /**
-   * @brief Check if there are any queries to be executed.
-   *
-   * @return True if there is at least one active or waiting query, false if
-   *         the policy enforcer doesn't have any query.
-   **/
-  inline bool hasQueries() const {
-    return !(admitted_queries_.empty() && waiting_queries_.empty());
-  }
-
-  /**
-   * @brief Get the profiling results for individual work order execution for a
-   *        given query.
-   *
-   * @note This function should only be called if profiling individual work
-   *       orders option is enabled.
-   *
-   * @param query_id The ID of the query for which the profiling results are
-   *        requested.
-   *
-   * @return A vector of tuples, each being a single profiling entry.
-   **/
-  inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
-      getProfilingResults(const std::size_t query_id) const {
-    DCHECK(profile_individual_workorders_);
-    DCHECK(workorder_time_recorder_.find(query_id) !=
-           workorder_time_recorder_.end());
-    return workorder_time_recorder_.at(query_id);
-  }
-
- private:
-  static constexpr std::size_t kMaxConcurrentQueries = 1;
-
-  /**
-   * @brief Record the execution time for a finished WorkOrder.
-   *
-   * TODO(harshad) - Extend the functionality to rebuild work orders.
-   *
-   * @param proto The completion message proto sent after the WorkOrder
-   *        execution.
-   **/
-  void recordTimeForWorkOrder(
-      const serialization::NormalWorkOrderCompletionMessage &proto);
-
-  const tmb::client_id foreman_client_id_;
-  const std::size_t num_numa_nodes_;
-
-  CatalogDatabaseLite *catalog_database_;
-  StorageManager *storage_manager_;
-  WorkerDirectory *worker_directory_;
-
-  tmb::MessageBus *bus_;
-  const bool profile_individual_workorders_;
-
-  // Key = query ID, value = QueryManagerBase* for the key query.
-  std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
-
-  // The queries which haven't been admitted yet.
-  std::queue<QueryHandle*> waiting_queries_;
-
-  // Key = Query ID.
-  // Value = A tuple indicating a record of executing a work order.
-  // Within a tuple ...
-  // 1st element: Logical worker ID.
-  // 2nd element: Operator ID.
-  // 3rd element: Time in microseconds to execute the work order.
-  std::unordered_map<
-      std::size_t,
-      std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
-      workorder_time_recorder_;
-
-  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
new file mode 100644
index 0000000..d16a502
--- /dev/null
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -0,0 +1,177 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
+  std::size_t query_id;
+  QueryManagerBase::dag_node_index op_index;
+
+  switch (tagged_message.message_type()) {
+    case kWorkOrderCompleteMessage: {
+      serialization::NormalWorkOrderCompletionMessage proto;
+      // Note: This proto message contains the time it took to execute the
+      // WorkOrder. It can be accessed in this scope.
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      decrementNumQueuedWorkOrders(proto.worker_thread_index());
+
+      if (profile_individual_workorders_) {
+        recordTimeForWorkOrder(proto);
+      }
+
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
+      break;
+    }
+    case kRebuildWorkOrderCompleteMessage: {
+      serialization::RebuildWorkOrderCompletionMessage proto;
+      // Note: This proto message contains the time it took to execute the
+      // rebuild WorkOrder. It can be accessed in this scope.
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      decrementNumQueuedWorkOrders(proto.worker_thread_index());
+
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
+      break;
+    }
+    case kCatalogRelationNewBlockMessage: {
+      serialization::CatalogRelationNewBlockMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+
+      const block_id block = proto.block_id();
+
+      CatalogRelation *relation =
+          static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
+      relation->addBlock(block);
+
+      if (proto.has_partition_id()) {
+        relation->getPartitionSchemeMutable()->addBlockToPartition(
+            proto.partition_id(), block);
+      }
+      return;
+    }
+    case kDataPipelineMessage: {
+      serialization::DataPipelineMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      admitted_queries_[query_id]->processDataPipelineMessage(
+          op_index, proto.block_id(), proto.relation_id());
+      break;
+    }
+    case kWorkOrdersAvailableMessage: {
+      serialization::WorkOrdersAvailableMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+
+      // Check if new work orders are available.
+      admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
+      break;
+    }
+    case kWorkOrderFeedbackMessage: {
+      WorkOrder::FeedbackMessage msg(
+          const_cast<void *>(tagged_message.message()),
+          tagged_message.message_bytes());
+      query_id = msg.header().query_id;
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = msg.header().rel_op_index;
+      admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
+  }
+  if (admitted_queries_[query_id]->queryStatus(op_index) ==
+          QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+    removeQuery(query_id);
+    if (!waiting_queries_.empty()) {
+      // Admit the earliest waiting query.
+      QueryHandle *new_query = waiting_queries_.front();
+      waiting_queries_.pop();
+      admitQuery(new_query);
+    }
+  }
+}
+
+void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+    LOG(WARNING) << "Removing query with ID " << query_id
+                 << " that hasn't finished its execution";
+  }
+  admitted_queries_.erase(query_id);
+}
+
+bool PolicyEnforcerBase::admitQueries(
+    const std::vector<QueryHandle*> &query_handles) {
+  for (QueryHandle *curr_query : query_handles) {
+    if (!admitQuery(curr_query)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void PolicyEnforcerBase::recordTimeForWorkOrder(
+    const serialization::NormalWorkOrderCompletionMessage &proto) {
+  const std::size_t query_id = proto.query_id();
+  if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
+    workorder_time_recorder_[query_id];
+  }
+  workorder_time_recorder_[query_id].emplace_back(
+      proto.worker_thread_index(),
+      proto.operator_index(),
+      proto.execution_time_in_microseconds());
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
new file mode 100644
index 0000000..0482ebc
--- /dev/null
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -0,0 +1,196 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A base class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param catalog_database The CatalogDatabase used.
+   * @param profile_individual_workorders If true, profile each normal work order.
+   **/
+  PolicyEnforcerBase(CatalogDatabaseLite *catalog_database,
+                     const bool profile_individual_workorders)
+      : catalog_database_(catalog_database),
+        profile_individual_workorders_(profile_individual_workorders) {}
+
+  /**
+   * @brief Virtual Destructor.
+   **/
+  virtual ~PolicyEnforcerBase() {
+    if (hasQueries()) {
+      LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
+                      "waiting queries";
+    }
+  }
+
+  /**
+   * @brief Admit multiple queries in the system.
+   *
+   * @note In the current simple implementation, we only allow one active
+   *       query in the system. Other queries will have to wait.
+   *
+   * @param query_handles A vector of QueryHandles for the queries to be
+   *        admitted.
+   *
+   * @return True if all the queries were admitted, false if at least one query
+   *         was not admitted.
+   **/
+  bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+  /**
+   * @brief Remove a given query that is under execution.
+   *
+   * @note This function is made public so that it is possible for a query
+   *       to be killed. Otherwise, it should only be used privately by the
+   *       class.
+   *
+   * TODO(harshad) - Extend this function to support removal of waiting queries.
+   *
+   * @param query_id The ID of the query to be removed.
+   **/
+  void removeQuery(const std::size_t query_id);
+
+  /**
+   * @brief Process a message sent to the Foreman, which gets passed on to the
+   *        policy enforcer.
+   *
+   * @param message The message.
+   **/
+  void processMessage(const TaggedMessage &tagged_message);
+
+  /**
+   * @brief Check if there are any queries to be executed.
+   *
+   * @return True if there is at least one active or waiting query, false if
+   *         the policy enforcer doesn't have any query.
+   **/
+  inline bool hasQueries() const {
+    return !(admitted_queries_.empty() && waiting_queries_.empty());
+  }
+
+  /**
+   * @brief Get the profiling results for individual work order execution for a
+   *        given query.
+   *
+   * @note This function should only be called if profiling individual work
+   *       orders option is enabled.
+   *
+   * @param query_id The ID of the query for which the profiling results are
+   *        requested.
+   *
+   * @return A vector of tuples, each being a single profiling entry.
+   **/
+  inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
+      getProfilingResults(const std::size_t query_id) const {
+    DCHECK(profile_individual_workorders_);
+    DCHECK(workorder_time_recorder_.find(query_id) !=
+           workorder_time_recorder_.end());
+    return workorder_time_recorder_.at(query_id);
+  }
+
+ protected:
+  static constexpr std::size_t kMaxConcurrentQueries = 1;
+
+  /**
+   * @brief Record the execution time for a finished WorkOrder.
+   *
+   * TODO(harshad) - Extend the functionality to rebuild work orders.
+   *
+   * @param proto The completion message proto sent after the WorkOrder
+   *        execution.
+   **/
+  void recordTimeForWorkOrder(
+      const serialization::NormalWorkOrderCompletionMessage &proto);
+
+  CatalogDatabaseLite *catalog_database_;
+
+  const bool profile_individual_workorders_;
+
+  // Key = query ID, value = QueryManagerBase* for the key query.
+  std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
+
+  // The queries which haven't been admitted yet.
+  std::queue<QueryHandle*> waiting_queries_;
+
+  // Key = Query ID.
+  // Value = A tuple indicating a record of executing a work order.
+  // Within a tuple ...
+  // 1st element: Logical worker ID.
+  // 2nd element: Operator ID.
+  // 3rd element: Time in microseconds to execute the work order.
+  std::unordered_map<
+      std::size_t,
+      std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
+      workorder_time_recorder_;
+
+ private:
+  /**
+   * @brief Admit a query to the system.
+   *
+   * @param query_handle The QueryHandle for the new query.
+   *
+   * @return Whether the query was admitted to the system.
+   **/
+  virtual bool admitQuery(QueryHandle *query_handle) = 0;
+
+  /**
+   * @brief Decrement the number of queued workorders for the given worker by 1.
+   *
+   * @param worker_index The logical ID of the given worker.
+   **/
+  virtual void decrementNumQueuedWorkOrders(const std::size_t worker_index) = 0;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerBase);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.cpp b/query_execution/PolicyEnforcerSingleNode.cpp
new file mode 100644
index 0000000..549e39f
--- /dev/null
+++ b/query_execution/PolicyEnforcerSingleNode.cpp
@@ -0,0 +1,110 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerSingleNode.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+              " can be allocated in a single round of dispatch of messages to"
+              " the workers.");
+
+void PolicyEnforcerSingleNode::getWorkerMessages(
+    std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+  // Iterate over admitted queries until either there are no more
+  // messages available, or the maximum number of messages have
+  // been collected.
+  DCHECK(worker_messages->empty());
+  // TODO(harshad) - Make this function generic enough so that it
+  // works well when multiple queries are getting executed.
+  std::size_t per_query_share = 0;
+  if (!admitted_queries_.empty()) {
+    per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+  } else {
+    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    return;
+  }
+  DCHECK_GT(per_query_share, 0u);
+  std::vector<std::size_t> finished_queries_ids;
+
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+    DCHECK(curr_query_manager != nullptr);
+    std::size_t messages_collected_curr_query = 0;
+    while (messages_collected_curr_query < per_query_share) {
+      WorkerMessage *next_worker_message =
+          static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
+      if (next_worker_message != nullptr) {
+        ++messages_collected_curr_query;
+        worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+      } else {
+        // No more work ordes from the current query at this time.
+        // Check if the query's execution is over.
+        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          // If the query has been executed, remove it.
+          finished_queries_ids.push_back(admitted_query_info.first);
+        }
+        break;
+      }
+    }
+  }
+  for (const std::size_t finished_qid : finished_queries_ids) {
+    removeQuery(finished_qid);
+  }
+}
+
+bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) {
+  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      // Query with the same ID not present, ok to admit.
+      admitted_queries_[query_id].reset(
+          new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
+                                     catalog_database_, storage_manager_, bus_));
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // This query will have to wait.
+    waiting_queries_.push(query_handle);
+    return false;
+  }
+}
+
+void PolicyEnforcerSingleNode::decrementNumQueuedWorkOrders(const std::size_t worker_index) {
+  worker_directory_->decrementNumQueuedWorkOrders(worker_index);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.hpp b/query_execution/PolicyEnforcerSingleNode.hpp
new file mode 100644
index 0000000..671fd83
--- /dev/null
+++ b/query_execution/PolicyEnforcerSingleNode.hpp
@@ -0,0 +1,104 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+class WorkerDirectory;
+class WorkerMessage;
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerSingleNode final : public PolicyEnforcerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param num_numa_nodes Number of NUMA nodes used by the system.
+   * @param catalog_database The CatalogDatabase used.
+   * @param storage_manager The StorageManager used.
+   * @param bus The TMB.
+   **/
+  PolicyEnforcerSingleNode(const tmb::client_id foreman_client_id,
+                           const std::size_t num_numa_nodes,
+                           CatalogDatabaseLite *catalog_database,
+                           StorageManager *storage_manager,
+                           WorkerDirectory *worker_directory,
+                           tmb::MessageBus *bus,
+                           const bool profile_individual_workorders = false)
+      : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+        foreman_client_id_(foreman_client_id),
+        num_numa_nodes_(num_numa_nodes),
+        storage_manager_(storage_manager),
+        worker_directory_(worker_directory),
+        bus_(bus) {}
+
+  /**
+   * @brief Destructor.
+   **/
+  ~PolicyEnforcerSingleNode() override {}
+
+  bool admitQuery(QueryHandle *query_handle) override;
+
+  /**
+   * @brief Get worker messages to be dispatched. These worker messages come
+   *        from the active queries.
+   *
+   * @param worker_messages The worker messages to be dispatched.
+   **/
+  void getWorkerMessages(
+      std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+ private:
+  void decrementNumQueuedWorkOrders(const std::size_t worker_index) override;
+
+  const tmb::client_id foreman_client_id_;
+  const std::size_t num_numa_nodes_;
+
+  StorageManager *storage_manager_;
+  WorkerDirectory *worker_directory_;
+
+  tmb::MessageBus *bus_;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerSingleNode);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_