You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/25 20:23:59 UTC
[1/4] incubator-quickstep git commit: Added QueryManagerDistributed.
[Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/SQL-window-aggregation b80f5e9cf -> c0bb4620c (forced update)
Added QueryManagerDistributed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cdecf0b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cdecf0b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cdecf0b5
Branch: refs/heads/SQL-window-aggregation
Commit: cdecf0b561d0cf1c80516d51401bd098a6263659
Parents: db0e7e3
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Jul 12 22:04:54 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Jul 19 11:46:42 2016 -0700
----------------------------------------------------------------------
CMakeLists.txt | 10 ++
query_execution/CMakeLists.txt | 34 +++-
query_execution/QueryExecutionMessages.proto | 34 +++-
query_execution/QueryExecutionTypedefs.hpp | 3 +
query_execution/QueryManagerDistributed.cpp | 159 +++++++++++++++++++
query_execution/QueryManagerDistributed.hpp | 105 +++++++++++++
query_execution/ShiftbossDirectory.hpp | 181 ++++++++++++++++++++++
7 files changed, 518 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0e5d3de..0bbde61 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -281,6 +281,16 @@ else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror")
endif()
+ if (ENABLE_DISTRIBUTED)
+ # Clang reports such warning when using Protoc 3.0 beta.
+ if(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
+ CHECK_CXX_COMPILER_FLAG("-Wno-extended-offsetof" COMPILER_HAS_WNO_EXTENDED_OFFSETOF)
+ if (COMPILER_HAS_WNO_EXTENDED_OFFSETOF)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-extended-offsetof")
+ endif()
+ endif()
+ endif()
+
# One of the protobuf headers includes a nested anonymous union within
# another anonymous type. Ordinarily we work around this by compiling the
# protobuf libraries themselves with "-Wno-nested-anon-types" and including
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 028531d..8c12a5d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -28,10 +28,10 @@ else()
endif()
# Declare micro-libs:
+add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
endif()
-add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
@@ -46,7 +46,13 @@ add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryE
add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp)
add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp)
add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
+endif()
add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
+endif()
add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -143,6 +149,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto
quickstep_utility_SortConfiguration_proto
${PROTOBUF_LIBRARY})
target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_relationaloperators_WorkOrder_proto
${PROTOBUF_LIBRARY})
target_link_libraries(quickstep_queryexecution_QueryExecutionState
glog
@@ -167,6 +174,22 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase
quickstep_storage_StorageBlockInfo
quickstep_utility_DAG
quickstep_utility_Macros)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionState
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_QueryManagerBase
+ quickstep_queryexecution_ShiftbossDirectory
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_utility_DAG
+ quickstep_utility_Macros
+ tmb)
+endif()
target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryContext
@@ -182,6 +205,11 @@ target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
quickstep_utility_DAG
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
+ quickstep_utility_Macros
+ tmb)
+endif()
target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
glog
quickstep_relationaloperators_WorkOrder_proto
@@ -233,7 +261,9 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_WorkerSelectionPolicy)
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution
- quickstep_queryexecution_BlockLocator)
+ quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_QueryManagerDistributed
+ quickstep_queryexecution_ShiftbossDirectory)
endif()
# Tests:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 65a8946..fa20993 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,22 +16,24 @@ syntax = "proto2";
package quickstep.serialization;
+import "relational_operators/WorkOrder.proto";
+
// Used for any messages that do not carry payloads.
message EmptyMessage {
}
// Note: There are different types of completion messages for normal work orders
// rebuild work orders. This can be potentially helpful when we want to collect
-// different statistics for executing different types of work orders.
-// e.g. In select normal work order completion message, we could be interested
-// in the selectivity of the block whose work order got execute. In rebuild work
-// order completion message, we may be interested in adding the compression
+// different statistics for executing different types of work orders.
+// e.g. In select normal work order completion message, we could be interested
+// in the selectivity of the block whose work order got execute. In rebuild work
+// order completion message, we may be interested in adding the compression
// ratio or dictionary size of the rebuilt block.
-// TODO(harshad) : If there are different fields in the two message types below,
+// TODO(harshad) : If there are different fields in the two message types below,
// create a base message class called WorkOrderCompletionMessage and make the
// two classes below extend the base class. All the common fields in both the
-// classes can be moved to the base class.
+// classes can be moved to the base class.
// A message sent upon completion of a normal (not rebuild) WorkOrder execution.
message NormalWorkOrderCompletionMessage {
@@ -70,6 +72,26 @@ message WorkOrdersAvailableMessage {
required uint64 query_id = 2;
}
+// Distributed version related messages.
+message WorkOrderMessage {
+ required uint64 query_id = 1;
+ required uint64 operator_index = 2;
+ required WorkOrder work_order = 3;
+}
+
+message InitiateRebuildMessage {
+ required uint64 query_id = 1;
+ required uint64 operator_index = 2;
+ required uint64 insert_destination_index = 3;
+ required int32 relation_id = 4;
+}
+
+message InitiateRebuildResponseMessage {
+ required uint64 query_id = 1;
+ required uint64 operator_index = 2;
+ required uint64 num_rebuild_work_orders = 3;
+}
+
// BlockLocator related messages.
message BlockDomainRegistrationMessage {
// Format IP:Port, i.e., "0.0.0.0:0".
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..61e76d7 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -73,6 +73,9 @@ enum QueryExecutionMessageType : message_type_id {
kPoisonMessage, // From the main thread to Foreman and Workers.
#ifdef QUICKSTEP_DISTRIBUTED
+ kInitiateRebuildMessage, // From Foreman to Shiftboss.
+ kInitiateRebuildResponseMessage, // From Shiftboss to Foreman.
+
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
new file mode 100644
index 0000000..e906fa5
--- /dev/null
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -0,0 +1,159 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/QueryManagerDistributed.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "utility/DAG.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+
+namespace quickstep {
+
+QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
+ ShiftbossDirectory *shiftbosses,
+ const tmb::client_id foreman_client_id,
+ tmb::MessageBus *bus)
+ : QueryManagerBase(query_handle),
+ shiftbosses_(shiftbosses),
+ foreman_client_id_(foreman_client_id),
+ bus_(bus),
+ normal_workorder_protos_container_(
+ new WorkOrderProtosContainer(num_operators_in_dag_)) {
+ // Collect all the workorders from all the relational operators in the DAG.
+ for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
+ if (checkAllBlockingDependenciesMet(index)) {
+ query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
+ processOperator(index, false);
+ }
+ }
+}
+
+serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage(
+ const dag_node_index start_operator_index) {
+ // Default policy: Operator with lowest index first.
+ size_t num_operators_checked = 0;
+ for (dag_node_index index = start_operator_index;
+ num_operators_checked < num_operators_in_dag_;
+ index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
+ if (query_exec_state_->hasExecutionFinished(index)) {
+ continue;
+ }
+ unique_ptr<serialization::WorkOrder> work_order_proto(
+ normal_workorder_protos_container_->getWorkOrderProto(index));
+ if (work_order_proto != nullptr) {
+ query_exec_state_->incrementNumQueuedWorkOrders(index);
+
+ unique_ptr<serialization::WorkOrderMessage> message_proto(new serialization::WorkOrderMessage);
+ message_proto->set_query_id(query_id_);
+ message_proto->set_operator_index(index);
+ message_proto->mutable_work_order()->MergeFrom(*work_order_proto);
+
+ return message_proto.release();
+ }
+ }
+ // No normal WorkOrder protos available right now.
+ return nullptr;
+}
+
+bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) {
+ bool generated_new_workorder_protos = false;
+ if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
+ // Do not fetch any work units until all blocking dependencies are met.
+ // The releational operator is not aware of blocking dependencies for
+ // uncorrelated scalar queries.
+ if (!checkAllBlockingDependenciesMet(index)) {
+ return false;
+ }
+ const size_t num_pending_workorder_protos_before =
+ normal_workorder_protos_container_->getNumWorkOrderProtos(index);
+ const bool done_generation =
+ query_dag_->getNodePayloadMutable(index)
+ ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
+ if (done_generation) {
+ query_exec_state_->setDoneGenerationWorkOrders(index);
+ }
+
+ // TODO(shoban): It would be a good check to see if operator is making
+ // useful progress, i.e., the operator either generates work orders to
+ // execute or still has pending work orders executing. However, this will not
+ // work if Foreman polls operators without feeding data. This check can be
+ // enabled, if Foreman is refactored to call getAllWorkOrders() only when
+ // pending work orders are completed or new input blocks feed.
+
+ generated_new_workorder_protos =
+ (num_pending_workorder_protos_before <
+ normal_workorder_protos_container_->getNumWorkOrderProtos(index));
+ }
+ return generated_new_workorder_protos;
+}
+
+bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
+ DCHECK(checkRebuildRequired(index));
+ DCHECK(!checkRebuildInitiated(index));
+
+ const RelationalOperator &op = query_dag_->getNodePayload(index);
+ DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
+
+ serialization::InitiateRebuildMessage proto;
+ proto.set_operator_index(index);
+ proto.set_insert_destination_index(op.getInsertDestinationID());
+ proto.set_relation_id(op.getOutputRelationID());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage tagged_msg(static_cast<const void *>(proto_bytes),
+ proto_length,
+ kInitiateRebuildMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+ << "') to Shiftboss";
+ // TODO(zuyu): Multiple workers support.
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftbosses_->getClientId(0),
+ move(tagged_msg));
+
+ // The negative value indicates that the number of rebuild work orders is to be
+ // determined.
+ query_exec_state_->setRebuildStatus(index, -1, true);
+
+ // Wait for Shiftbosses to report the number of rebuild work orders.
+ return false;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
new file mode 100644
index 0000000..8641c22
--- /dev/null
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+
+#include <memory>
+
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class QueryHandle;
+class ShiftbossDirectory;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class that manages the execution of a query including generation
+ * of new work orders, keeping track of the query exection state.
+ **/
+class QueryManagerDistributed final : public QueryManagerBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_handle The QueryHandle object for this query.
+ * @param shiftbosses The ShiftbossDirectory to use.
+ * @param foreman_client_id The TMB client ID of the foreman thread.
+ * @param bus The TMB used for communication.
+ **/
+ QueryManagerDistributed(QueryHandle *query_handle,
+ ShiftbossDirectory *shiftbosses,
+ const tmb::client_id foreman_client_id,
+ tmb::MessageBus *bus);
+
+ ~QueryManagerDistributed() override {}
+
+ bool fetchNormalWorkOrders(const dag_node_index index) override;
+
+ /**
+ * @brief Get the next normal workorder to be excuted, wrapped in a
+ * WorkOrderMessage proto.
+ *
+ * @param start_operator_index Begin the search for the schedulable WorkOrder
+ * with the operator at this index.
+ *
+ * @return A pointer to the WorkOrderMessage proto. If there is no WorkOrder
+ * to be executed, return NULL.
+ **/
+ serialization::WorkOrderMessage* getNextWorkOrderMessage(
+ const dag_node_index start_operator_index);
+
+ private:
+ bool checkNormalExecutionOver(const dag_node_index index) const override {
+ return (checkAllDependenciesMet(index) &&
+ !normal_workorder_protos_container_->hasWorkOrderProto(index) &&
+ query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
+ query_exec_state_->hasDoneGenerationWorkOrders(index));
+ }
+
+ bool initiateRebuild(const dag_node_index index) override;
+
+ bool checkRebuildOver(const dag_node_index index) const override {
+ return query_exec_state_->hasRebuildInitiated(index) &&
+ (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+ }
+
+ ShiftbossDirectory *shiftbosses_;
+
+ const tmb::client_id foreman_client_id_;
+ tmb::MessageBus *bus_;
+
+ std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
+
+ DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/ShiftbossDirectory.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ShiftbossDirectory.hpp b/query_execution/ShiftbossDirectory.hpp
new file mode 100644
index 0000000..abb5f32
--- /dev/null
+++ b/query_execution/ShiftbossDirectory.hpp
@@ -0,0 +1,181 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
+
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class which keeps the metadata about the shiftbosses.
+ *
+ * @note This class is intended to be used only by ForemanDistributed thread.
+ * Therefore, none of the methods in this class are thread-safe.
+ **/
+class ShiftbossDirectory {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ ShiftbossDirectory() = default;
+
+ /**
+ * @brief Add the Shiftboss.
+ *
+ * @param shiftboss_id the TMB client ID of Shiftboss thread.
+ * @param capacity The Work Order processing capacity of Shiftboss.
+ **/
+ void addShiftboss(const tmb::client_id shiftboss_id,
+ const std::size_t capacity) {
+ client_id_indices_.emplace(shiftboss_id, client_ids_.size());
+ client_ids_.push_back(shiftboss_id);
+ work_order_capacities_.push_back(capacity);
+ num_queued_work_orders_.push_back(0u);
+ }
+
+ /**
+ * @brief Whether the ShiftbossDirectory has any Shiftboss.
+ *
+ * @return True if no Shiftboss in ShiftbossDirectory. Otherwise false.
+ **/
+ bool empty() const {
+ DCHECK_EQ(client_ids_.empty(), client_id_indices_.empty());
+ DCHECK_EQ(client_ids_.empty(), work_order_capacities_.empty());
+ DCHECK_EQ(client_ids_.empty(), num_queued_work_orders_.empty());
+
+ return client_ids_.empty();
+ }
+
+ /**
+ * @brief Get the number of Shiftboss in ShiftbossDirectory.
+ *
+ * @return The number of Shiftboss in ShiftbossDirectory.
+ **/
+ std::size_t size() const {
+ DCHECK_EQ(client_ids_.size(), client_id_indices_.size());
+ DCHECK_EQ(client_ids_.size(), work_order_capacities_.size());
+ DCHECK_EQ(client_ids_.size(), num_queued_work_orders_.size());
+
+ return client_ids_.size();
+ }
+
+ /**
+ * @brief Get the TMB client ID of the specified Shiftboss.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ *
+ * @return The TMB client ID of the given Shiftboss.
+ **/
+ tmb::client_id getClientId(const std::size_t shiftboss_index) const {
+ DCHECK_LT(shiftboss_index, size());
+ return client_ids_[shiftboss_index];
+ }
+
+ /**
+ * @brief Get the Shiftboss index from the specified client id.
+ *
+ * @param shiftboss_id The TMB client ID of Shiftboss.
+ *
+ * @return The index of the given Shiftboss.
+ **/
+ std::size_t getShiftbossIndex(const tmb::client_id shiftboss_id) const {
+ const auto it = client_id_indices_.find(shiftboss_id);
+ DCHECK(it != client_id_indices_.end());
+
+ return it->second;
+ }
+
+ /**
+ * @brief Whether the given Shiftboss has reached its capacity.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ *
+ * @return True if reached the capacity. Otherwise false.
+ **/
+ bool hasReachedCapacity(const std::size_t shiftboss_index) const {
+ DCHECK_LT(shiftboss_index, size());
+ return num_queued_work_orders_[shiftboss_index] >= work_order_capacities_[shiftboss_index];
+ }
+
+ /**
+ * @brief Add the number of new work orders for the given Shiftboss.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ * @param num_new_work_orders The number of the new work orders will be
+ * executed on Shiftboss indexed by 'shiftboss_index'.
+ **/
+ void addNumQueuedWorkOrders(const std::size_t shiftboss_index,
+ const std::size_t num_new_work_orders) {
+ num_queued_work_orders_[shiftboss_index] += num_new_work_orders;
+ }
+
+ /**
+ * @brief Increase the number of queued workorders for the given Shiftboss by 1.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ **/
+ void incrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
+ DCHECK_LT(shiftboss_index, size());
+ ++num_queued_work_orders_[shiftboss_index];
+ }
+
+ /**
+ * @brief Decrease the number of queued workorders for the given Shiftboss by 1.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ **/
+ void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
+ DCHECK_LT(shiftboss_index, size());
+ DCHECK_GE(num_queued_work_orders_[shiftboss_index], 1u);
+ --num_queued_work_orders_[shiftboss_index];
+ }
+
+ private:
+ // The TMB client IDs of Shiftbosses.
+ // TODO(zuyu): Support deletions, as Shiftbosses go down.
+ std::vector<tmb::client_id> client_ids_;
+
+ // The map from the TMB client ID of Shiftboss to its index in 'client_ids_'.
+ std::unordered_map<tmb::client_id, std::size_t> client_id_indices_;
+
+ // The max number of WorkOrders per Shiftboss.
+ std::vector<std::size_t> work_order_capacities_;
+
+ // The number of WorkOrders queued for execution per Shiftboss, and the value
+ // should be not greater than that of work_order_capacities_.
+ std::vector<std::size_t> num_queued_work_orders_;
+
+ DISALLOW_COPY_AND_ASSIGN(ShiftbossDirectory);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
[3/4] incubator-quickstep git commit: - Supported ROWS mode for AVG
window aggregation. - Created WindowAggregateFunctions in
expressions/window_aggregation. - Created WindowAggregationHandle for AVG to
actually do the calculation. - Other functions will
Posted by zu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
new file mode 100644
index 0000000..c044a98
--- /dev/null
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * limitations under the License.
+ **/
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "types/CharType.hpp"
+#include "types/DateOperatorOverloads.hpp"
+#include "types/DatetimeIntervalType.hpp"
+#include "types/DoubleType.hpp"
+#include "types/FloatType.hpp"
+#include "types/IntType.hpp"
+#include "types/IntervalLit.hpp"
+#include "types/LongType.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/VarCharType.hpp"
+#include "types/YearMonthIntervalType.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+#include "gtest/gtest.h"
+
+namespace quickstep {
+
+namespace {
+
+ constexpr int kNumTuples = 100;
+ constexpr int kNumTuplesPerPartition = 8;
+ constexpr int kNullInterval = 25;
+ constexpr int kNumPreceding = 2;
+ constexpr int kNumFollowing = 2;
+
+} // namespace
+
+// Attribute value could be null if set true.
+class WindowAggregationHandleAvgTest : public::testing::Test {
+ protected:
+ // Handle initialization.
+ void initializeHandle(const Type &argument_type) {
+ const WindowAggregateFunction &function =
+ WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg);
+ std::vector<const Type*> partition_key_types(1, &TypeFactory::GetType(kInt, false));
+ handle_avg_.reset(function.createHandle(std::vector<const Type*>(1, &argument_type),
+ std::move(partition_key_types)));
+ }
+
+ // Test canApplyToTypes().
+ static bool CanApplyToTypesTest(TypeID typeID) {
+ const Type &type = (typeID == kChar || typeID == kVarChar) ?
+ TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
+ TypeFactory::GetType(typeID);
+
+ return WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).canApplyToTypes(
+ std::vector<const Type*>(1, &type));
+ }
+
+ // Test resultTypeForArgumentTypes().
+ static bool ResultTypeForArgumentTypesTest(TypeID input_type_id,
+ TypeID output_type_id) {
+ const Type *result_type
+ = WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).resultTypeForArgumentTypes(
+ std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+ return (result_type->getTypeID() == output_type_id);
+ }
+
+ template <typename CppType>
+ static void CheckAvgValues(
+ const std::vector<CppType*> &expected,
+ const ColumnVector *actual) {
+ EXPECT_TRUE(actual->isNative());
+ const NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
+
+ EXPECT_EQ(expected.size(), native->size());
+ for (std::size_t i = 0; i < expected.size(); ++i) {
+ if (expected[i] == nullptr) {
+ EXPECT_TRUE(native->getTypedValue(i).isNull());
+ } else {
+ EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral<CppType>());
+ }
+ }
+ }
+
+ // Static templated methods for set a meaningful value to data types.
+ template <typename CppType>
+ static void SetDataType(int value, CppType *data) {
+ *data = value;
+ }
+
+ template <typename GenericType, typename OutputType = DoubleType>
+ void checkWindowAggregationAvgGeneric() {
+ const GenericType &type = GenericType::Instance(true);
+ initializeHandle(type);
+
+ // Create argument, partition key and cpptype vectors.
+ std::vector<typename GenericType::cpptype*> argument_cpp_vector;
+ argument_cpp_vector.reserve(kNumTuples);
+ ColumnVector *argument_type_vector =
+ createArgumentGeneric<GenericType>(&argument_cpp_vector);
+ NativeColumnVector *partition_key_vector =
+ new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+
+ for (int i = 0; i < kNumTuples; ++i) {
+ partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+ }
+
+ // Create tuple ValueAccessor.
+ ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
+ tuple_accessor->addColumn(partition_key_vector);
+ tuple_accessor->addColumn(argument_type_vector);
+
+ // Test UNBOUNDED PRECEDING AND CURRENT ROW.
+ checkAccumulate<GenericType, OutputType>(tuple_accessor,
+ argument_type_vector,
+ argument_cpp_vector);
+ // Test kNumPreceding PRECEDING AND kNumFollowing FOLLOWING.
+ checkSlidingWindow<GenericType, OutputType>(tuple_accessor,
+ argument_type_vector,
+ argument_cpp_vector);
+ }
+
+ template <typename GenericType>
+ ColumnVector *createArgumentGeneric(
+ std::vector<typename GenericType::cpptype*> *argument_cpp_vector) {
+ const GenericType &type = GenericType::Instance(true);
+ NativeColumnVector *column = new NativeColumnVector(type, kNumTuples);
+
+ for (int i = 0; i < kNumTuples; ++i) {
+ // Insert a NULL every kNullInterval tuples.
+ if (i % kNullInterval == 0) {
+ argument_cpp_vector->push_back(nullptr);
+ column->appendTypedValue(type.makeNullValue());
+ continue;
+ }
+
+ typename GenericType::cpptype *val = new typename GenericType::cpptype;
+
+ if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+ SetDataType(i - 10, val);
+ } else {
+ SetDataType(static_cast<float>(i - 10) / 10, val);
+ }
+
+ column->appendTypedValue(type.makeValue(val));
+ argument_cpp_vector->push_back(val);
+ }
+
+ return column;
+ }
+
+ template <typename GenericType, typename OutputType>
+ void checkAccumulate(ColumnVectorsValueAccessor *tuple_accessor,
+ ColumnVector *argument_type_vector,
+ const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
+ std::vector<ColumnVector*> arguments;
+ arguments.push_back(argument_type_vector);
+ // The partition key index is 0.
+ std::vector<attribute_id> partition_key(1, 0);
+
+ ColumnVector *result =
+ handle_avg_->calculate(tuple_accessor,
+ std::move(arguments),
+ partition_key,
+ true /* is_row */,
+ -1 /* num_preceding: UNBOUNDED PRECEDING */,
+ 0 /* num_following: CURRENT ROW */);
+
+ // Get the cpptype result.
+ std::vector<typename OutputType::cpptype*> result_cpp_vector;
+ typename GenericType::cpptype sum;
+ int count;
+ for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+ // Start of new partition
+ if (i % kNumTuplesPerPartition == 0) {
+ SetDataType(0, &sum);
+ count = 0;
+ }
+
+ typename GenericType::cpptype *value = argument_cpp_vector[i];
+ if (value != nullptr) {
+ sum += *value;
+ count++;
+ }
+
+ if (count == 0) {
+ result_cpp_vector.push_back(nullptr);
+ } else {
+ typename OutputType::cpptype *result_cpp_value =
+ new typename OutputType::cpptype;
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+ result_cpp_vector.push_back(result_cpp_value);
+ }
+ }
+
+ CheckAvgValues(result_cpp_vector, result);
+ }
+
+ template <typename GenericType, typename OutputType>
+ void checkSlidingWindow(ColumnVectorsValueAccessor *tuple_accessor,
+ ColumnVector *argument_type_vector,
+ const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
+ std::vector<ColumnVector*> arguments;
+ arguments.push_back(argument_type_vector);
+ // The partition key index is 0.
+ std::vector<attribute_id> partition_key(1, 0);
+
+ ColumnVector *result =
+ handle_avg_->calculate(tuple_accessor,
+ std::move(arguments),
+ partition_key,
+ true /* is_row */,
+ kNumPreceding,
+ kNumFollowing);
+
+ // Get the cpptype result.
+ // For each value, calculate all surrounding values in the window.
+ std::vector<typename OutputType::cpptype*> result_cpp_vector;
+
+ for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+ int count = 0;
+
+ if (argument_cpp_vector[i] != nullptr) {
+ sum += *argument_cpp_vector[i];
+ count++;
+ }
+
+ for (std::size_t precede = 1; precede <= kNumPreceding; ++precede) {
+ // Not the same partition.
+ if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition ||
+ i < precede) {
+ break;
+ }
+
+ if (argument_cpp_vector[i - precede] != nullptr) {
+ sum += *argument_cpp_vector[i - precede];
+ count++;
+ }
+ }
+
+ for (int follow = 1; follow <= kNumPreceding; ++follow) {
+ // Not the same partition.
+ if (i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition ||
+ i + follow >= kNumTuples) {
+ break;
+ }
+
+ if (argument_cpp_vector[i + follow] != nullptr) {
+ sum += *argument_cpp_vector[i + follow];
+ count++;
+ }
+ }
+
+ if (count == 0) {
+ result_cpp_vector.push_back(nullptr);
+ } else {
+ typename OutputType::cpptype *result_cpp_value =
+ new typename OutputType::cpptype;
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+ result_cpp_vector.push_back(result_cpp_value);
+ }
+ }
+
+ CheckAvgValues(result_cpp_vector, result);
+ }
+
+ std::unique_ptr<WindowAggregationHandle> handle_avg_;
+};
+
+template <>
+void WindowAggregationHandleAvgTest::CheckAvgValues<double>(
+ const std::vector<double*> &expected,
+ const ColumnVector *actual) {
+ EXPECT_TRUE(actual->isNative());
+ const NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
+
+ EXPECT_EQ(expected.size(), native->size());
+ for (std::size_t i = 0; i < expected.size(); ++i) {
+ if (expected[i] == nullptr) {
+ EXPECT_TRUE(native->getTypedValue(i).isNull());
+ } else {
+ EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral<double>());
+ }
+ }
+}
+
+template <>
+void WindowAggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(
+ int value, DatetimeIntervalLit *data) {
+ data->interval_ticks = value;
+}
+
+template <>
+void WindowAggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(
+ int value, YearMonthIntervalLit *data) {
+ data->months = value;
+}
+
+typedef WindowAggregationHandleAvgTest WindowAggregationHandleAvgDeathTest;
+
+TEST_F(WindowAggregationHandleAvgTest, IntTypeTest) {
+ checkWindowAggregationAvgGeneric<IntType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, LongTypeTest) {
+ checkWindowAggregationAvgGeneric<LongType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, FloatTypeTest) {
+ checkWindowAggregationAvgGeneric<FloatType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, DoubleTypeTest) {
+ checkWindowAggregationAvgGeneric<DoubleType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, DatetimeIntervalTypeTest) {
+ checkWindowAggregationAvgGeneric<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, YearMonthIntervalTypeTest) {
+ checkWindowAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+#ifdef QUICKSTEP_DEBUG
+TEST_F(WindowAggregationHandleAvgDeathTest, CharTypeTest) {
+ const Type &type = CharType::Instance(true, 10);
+ EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(WindowAggregationHandleAvgDeathTest, VarTypeTest) {
+ const Type &type = VarCharType::Instance(true, 10);
+ EXPECT_DEATH(initializeHandle(type), "");
+}
+#endif
+
+TEST_F(WindowAggregationHandleAvgDeathTest, canApplyToTypeTest) {
+ EXPECT_TRUE(CanApplyToTypesTest(kInt));
+ EXPECT_TRUE(CanApplyToTypesTest(kLong));
+ EXPECT_TRUE(CanApplyToTypesTest(kFloat));
+ EXPECT_TRUE(CanApplyToTypesTest(kDouble));
+ EXPECT_FALSE(CanApplyToTypesTest(kChar));
+ EXPECT_FALSE(CanApplyToTypesTest(kVarChar));
+ EXPECT_FALSE(CanApplyToTypesTest(kDatetime));
+ EXPECT_TRUE(CanApplyToTypesTest(kDatetimeInterval));
+ EXPECT_TRUE(CanApplyToTypesTest(kYearMonthInterval));
+}
+
+TEST_F(WindowAggregationHandleAvgDeathTest, ResultTypeForArgumentTypeTest) {
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kInt, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kLong, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kFloat, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDouble, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDatetimeInterval, kDatetimeInterval));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kYearMonthInterval, kYearMonthInterval));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7e53b9d..a56b714 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -69,6 +69,8 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryoptimizer_ExecutionHeuristics
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 43d63f9..ce21ade 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -48,6 +48,8 @@
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_optimizer/ExecutionHeuristics.hpp"
@@ -1652,7 +1654,7 @@ void ExecutionGenerator::convertWindowAggregate(
// Get input.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
- window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+ window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
// Get window aggregate function expression.
const E::AliasPtr &named_window_aggregate_expression =
@@ -1713,6 +1715,7 @@ void ExecutionGenerator::convertWindowAggregate(
const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
execution_plan_->addRelationalOperator(
new WindowAggregationOperator(query_handle_->query_id(),
+ *input_relation_info->relation,
*output_relation,
window_aggr_state_index,
insert_destination_index));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 08d7df5..d12644a 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -304,7 +304,7 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression
quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
glog
- quickstep_expressions_aggregation_AggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
quickstep_queryoptimizer_OptimizerTree
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_Expression
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp
index 7b1f304..be5db59 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.cpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.cpp
@@ -22,7 +22,7 @@
#include <utility>
#include <vector>
-#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/Expression.hpp"
#include "query_optimizer/expressions/PatternMatcher.hpp"
@@ -59,7 +59,7 @@ const Type& WindowAggregateFunction::getValueType() const {
}
WindowAggregateFunctionPtr WindowAggregateFunction::Create(
- const ::quickstep::AggregateFunction &window_aggregate,
+ const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp
index 0bee28f..0cc80df 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.hpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.hpp
@@ -33,8 +33,8 @@
namespace quickstep {
-class AggregateFunction;
class Type;
+class WindowAggregateFunction;
namespace optimizer {
namespace expressions {
@@ -140,7 +140,7 @@ class WindowAggregateFunction : public Expression {
* @return The WindowAggregateFunction singleton (from the expression system)
* for this node.
**/
- inline const ::quickstep::AggregateFunction& window_aggregate() const {
+ inline const ::quickstep::WindowAggregateFunction& window_aggregate() const {
return window_aggregate_;
}
@@ -185,7 +185,7 @@ class WindowAggregateFunction : public Expression {
* @param is_distinct Whether this is a DISTINCT aggregation.
* @return A new AggregateFunctionPtr.
**/
- static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate,
+ static WindowAggregateFunctionPtr Create(const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
@@ -209,7 +209,7 @@ class WindowAggregateFunction : public Expression {
* @param window_info The window info of the window aggregate function.
* @param is_distinct Indicates whether this is a DISTINCT aggregation.
*/
- WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate,
+ WindowAggregateFunction(const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
@@ -228,7 +228,7 @@ class WindowAggregateFunction : public Expression {
// window_aggregate_. If it really needs to be seperated from the
// AggregationFunction, a new class for WindowAggregationFunction should be
// created as quickstep::WindowAggregateFunction.
- const ::quickstep::AggregateFunction &window_aggregate_;
+ const ::quickstep::WindowAggregateFunction &window_aggregate_;
std::vector<ScalarPtr> arguments_;
const WindowInfo window_info_;
const std::string window_name_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt
index fb75767..9313e51 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -39,6 +39,8 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
quickstep_expressions_tablegenerator_GeneratorFunction
quickstep_expressions_tablegenerator_GeneratorFunctionFactory
quickstep_expressions_tablegenerator_GeneratorFunctionHandle
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
quickstep_parser_ParseAssignment
quickstep_parser_ParseBasicExpressions
quickstep_parser_ParseBlockProperties
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index f10378b..c224388 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -35,6 +35,8 @@
#include "expressions/table_generator/GeneratorFunction.hpp"
#include "expressions/table_generator/GeneratorFunctionFactory.hpp"
#include "expressions/table_generator/GeneratorFunctionHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
#include "parser/ParseAssignment.hpp"
#include "parser/ParseBasicExpressions.hpp"
#include "parser/ParseBlockProperties.hpp"
@@ -2624,11 +2626,19 @@ E::ScalarPtr Resolver::resolveFunctionCall(
<< "COUNT aggregate has both star (*) and non-star arguments.";
}
- // Try to look up the AggregateFunction by name using
- // AggregateFunctionFactory.
- const ::quickstep::AggregateFunction *aggregate
- = AggregateFunctionFactory::GetByName(function_name);
- if (aggregate == nullptr) {
+ // Try to look up the AggregateFunction/WindowAggregationFunction by name.
+ // TODO(Shixuan): We might want to create a new abstract class Function to
+ // include both AggregateFunction and WindowAggregateFunction, which will make
+ // this part of code cleaner.
+ const ::quickstep::AggregateFunction *aggregate = nullptr;
+ const ::quickstep::WindowAggregateFunction *window_aggregate = nullptr;
+ if (parse_function_call.isWindow()) {
+ window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
+ } else {
+ aggregate = AggregateFunctionFactory::GetByName(function_name);
+ }
+
+ if (aggregate == nullptr && window_aggregate == nullptr) {
THROW_SQL_ERROR_AT(&parse_function_call)
<< "Unrecognized function name \""
<< parse_function_call.name()->value()
@@ -2656,11 +2666,14 @@ E::ScalarPtr Resolver::resolveFunctionCall(
}
// Make sure a naked COUNT() with no arguments wasn't specified.
- if ((aggregate->getAggregationID() == AggregationID::kCount)
- && (resolved_arguments.empty())
- && (!count_star)) {
- THROW_SQL_ERROR_AT(&parse_function_call)
- << "COUNT aggregate requires an argument (either scalar or star (*))";
+ if ((aggregate != nullptr &&
+ aggregate->getAggregationID() == AggregationID::kCount) ||
+ (window_aggregate != nullptr &&
+ window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
+ if ((resolved_arguments.empty()) && !count_star) {
+ THROW_SQL_ERROR_AT(&parse_function_call)
+ << "COUNT aggregate requires an argument (either scalar or star (*))";
+ }
}
// Resolve each of the Scalar arguments to the aggregate.
@@ -2670,7 +2683,8 @@ E::ScalarPtr Resolver::resolveFunctionCall(
}
// Make sure that the aggregate can apply to the specified argument(s).
- if (!aggregate->canApplyToTypes(argument_types)) {
+ if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
+ || (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
THROW_SQL_ERROR_AT(&parse_function_call)
<< "Aggregate function " << aggregate->getName()
<< " can not apply to the given argument(s).";
@@ -2679,7 +2693,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
if (parse_function_call.isWindow()) {
return resolveWindowAggregateFunction(parse_function_call,
expression_resolution_info,
- aggregate,
+ window_aggregate,
resolved_arguments);
}
@@ -2705,7 +2719,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
E::ScalarPtr Resolver::resolveWindowAggregateFunction(
const ParseFunctionCall &parse_function_call,
ExpressionResolutionInfo *expression_resolution_info,
- const ::quickstep::AggregateFunction *window_aggregate,
+ const ::quickstep::WindowAggregateFunction *window_aggregate,
const std::vector<E::ScalarPtr> &resolved_arguments) {
// A window aggregate function might be defined OVER a window name or a window.
E::WindowAggregateFunctionPtr window_aggregate_function;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp
index f4024e9..373430c 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -23,7 +23,6 @@
#include <unordered_set>
#include <vector>
-#include "query_optimizer/expressions/AggregateFunction.hpp"
#include "query_optimizer/expressions/Alias.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
@@ -460,14 +459,14 @@ class Resolver {
* @param expression_resolution_info Resolution info that contains the name
* resolver and info to be updated after
* resolution.
- * @param aggregate The aggregate function.
+ * @param aggregate The window aggregate function.
* @param resolved_arguments The resolved arguments.
* @return An expression in the query optimizer.
*/
expressions::ScalarPtr resolveWindowAggregateFunction(
const ParseFunctionCall &parse_function_call,
ExpressionResolutionInfo *expression_resolution_info,
- const ::quickstep::AggregateFunction *aggregate,
+ const ::quickstep::WindowAggregateFunction *aggregate,
const std::vector<expressions::ScalarPtr> &resolved_arguments);
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 16127cc..30a3c39 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -953,19 +953,79 @@ WHERE double_col < 0
==
# Window Aggregation Test.
-# Currently this is not supported, an empty table will be returned.
-SELECT avg(int_col) OVER w FROM test
+SELECT char_col, long_col, avg(long_col) OVER w FROM test
WINDOW w AS
-(PARTITION BY char_col
- ORDER BY long_col DESC NULLS LAST
+(ORDER BY char_col DESC NULLS LAST
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
--
-+------------------------+
-|avg(int_col) |
-+------------------------+
-+------------------------+
++--------------------+--------------------+------------------------+
+|char_col |long_col |avg(long_col) |
++--------------------+--------------------+------------------------+
+| 8 2.828427| 64| 64|
+| 6 2.449490| 36| 50|
+| 4 2.000000| 16| 38.666666666666664|
+| 24 4.898979| 576| 173|
+| 22 4.690416| 484| 235.19999999999999|
+| 20 4.472136| 400| 262.66666666666669|
+| 2 1.414214| 4| 225.71428571428572|
+| 18 4.242641| 324| 238|
+| 16 4.000000| 256| 240|
+| 14 3.741657| 196| 235.59999999999999|
+| 12 3.464102| 144| 227.27272727272728|
+| 10 3.162278| 100| 216.66666666666666|
+| 0 0.000000| 0| 200|
+| -9 3.000000| 81| 191.5|
+| -7 2.645751| 49| 182|
+| -5 2.236068| 25| 172.1875|
+| -3 1.732051| 9| 162.58823529411765|
+| -23 4.795832| 529| 182.94444444444446|
+| -21 4.582576| 441| 196.52631578947367|
+| -19 4.358899| 361| 204.75|
+| -17 4.123106| 289| 208.76190476190476|
+| -15 3.872983| 225| 209.5|
+| -13 3.605551| 169| 207.7391304347826|
+| -11 3.316625| 121| 204.125|
+| -1 1.000000| 1| 196|
++--------------------+--------------------+------------------------+
==
+SELECT long_col, int_col, avg(int_col) OVER w FROM test
+WINDOW w AS
+(ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING);
+--
++--------------------+-----------+------------------------+
+|long_col |int_col |avg(int_col) |
++--------------------+-----------+------------------------+
+| 576| 24| 7.666666666666667|
+| 529| -23| 0.5|
+| 484| 22| 0.5|
+| 441| -21| -10.25|
+| 400| NULL| 0|
+| 361| -19| -9.75|
+| 324| 18| -0.5|
+| 289| -17| -3.3999999999999999|
+| 256| 16| 3.2000000000000002|
+| 225| -15| -3|
+| 196| 14| 2.7999999999999998|
+| 169| -13| -2.6000000000000001|
+| 144| 12| 0.5|
+| 121| -11| -5.25|
+| 100| NULL| 0|
+| 81| -9| -4.75|
+| 64| 8| -0.5|
+| 49| -7| -1.3999999999999999|
+| 36| 6| 1.2|
+| 25| -5| -1|
+| 16| 4| 0.80000000000000004|
+| 9| -3| -0.59999999999999998|
+| 4| 2| 0.5|
+| 1| -1| -0.66666666666666663|
+| 0| NULL| 0.5|
++--------------------+-----------+------------------------+
+==
+
+# Currently this is not supported, an empty table will be returned.
SELECT int_col, sum(float_col) OVER
(PARTITION BY char_col, long_col
ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
@@ -987,5 +1047,5 @@ WINDOW w AS
+------------------------+
|sum(avg(int_col)) |
+------------------------+
-| NULL|
+| -18|
+------------------------+
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 249441d..a51370b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -434,6 +434,7 @@ target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator
quickstep_relationaloperators_WorkOrder
quickstep_relationaloperators_WorkOrder_proto
quickstep_storage_StorageBlockInfo
+ quickstep_storage_WindowAggregationOperationState
quickstep_utility_Macros
tmb)
target_link_libraries(quickstep_relationaloperators_WorkOrder
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 93cb9d4..3149864 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -21,11 +21,13 @@
#include <vector>
+#include "catalog/CatalogRelation.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.hpp"
#include "tmb/id_typedefs.h"
@@ -40,10 +42,14 @@ bool WindowAggregationOperator::getAllWorkOrders(
DCHECK(query_context != nullptr);
if (blocking_dependencies_met_ && !generated_) {
+ std::vector<block_id> relation_blocks =
+ input_relation_.getBlocksSnapshot();
+
container->addNormalWorkOrder(
new WindowAggregationWorkOrder(
query_id_,
query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+ std::move(relation_blocks),
query_context->getInsertDestination(output_destination_index_)),
op_index_);
generated_ = true;
@@ -67,6 +73,11 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
proto->set_query_id(query_id_);
proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
window_aggregation_state_index_);
+
+ const std::vector<block_id> relation_blocks = input_relation_.getBlocksSnapshot();
+ for (const block_id bid : relation_blocks) {
+ proto->AddExtension(serialization::WindowAggregationWorkOrder::block_ids, bid);
+ }
proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
output_destination_index_);
@@ -75,8 +86,8 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
void WindowAggregationWorkOrder::execute() {
- std::cout << "Window aggregation is not supported yet.\n"
- << "An empty table is returned\n";
+ state_->windowAggregateBlocks(output_destination_,
+ block_ids_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
index f3dfd14..bd83248 100644
--- a/relational_operators/WindowAggregationOperator.hpp
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -58,16 +58,19 @@ class WindowAggregationOperator : public RelationalOperator {
*
* @param query_id The ID of this query.
* @param input_relation The relation to perform aggregation over.
+ * @param output_relation The relation for output.
* @param window_aggregation_state_index The index of WindowAggregationState
* in QueryContext.
* @param output_destination_index The index of InsertDestination in
* QueryContext for the output.
**/
WindowAggregationOperator(const std::size_t query_id,
+ const CatalogRelation &input_relation,
const CatalogRelation &output_relation,
const QueryContext::window_aggregation_state_id window_aggregation_state_index,
const QueryContext::insert_destination_id output_destination_index)
: RelationalOperator(query_id),
+ input_relation_(input_relation),
output_relation_(output_relation),
window_aggregation_state_index_(window_aggregation_state_index),
output_destination_index_(output_destination_index),
@@ -99,6 +102,7 @@ class WindowAggregationOperator : public RelationalOperator {
**/
serialization::WorkOrder* createWorkOrderProto();
+ const CatalogRelation &input_relation_;
const CatalogRelation &output_relation_;
const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
const QueryContext::insert_destination_id output_destination_index_;
@@ -117,43 +121,25 @@ class WindowAggregationWorkOrder : public WorkOrder {
*
* @param query_id The ID of this query.
* @param state The WindowAggregationOperatorState to use.
+ * @param block_ids The blocks' id of the input relation.
* @param output_destination The InsertDestination for output.
**/
WindowAggregationWorkOrder(const std::size_t query_id,
WindowAggregationOperationState *state,
+ std::vector<block_id> &&block_ids,
InsertDestination *output_destination)
: WorkOrder(query_id),
state_(state),
+ block_ids_(std::move(block_ids)),
output_destination_(output_destination) {}
~WindowAggregationWorkOrder() override {}
- /**
- * @brief Get the pointer to WindowAggregationOperationState.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return A pointer to the window aggregation operation state.
- **/
- WindowAggregationOperationState* state() {
- return state_;
- }
-
- /**
- * @brief Get the pointer to output destination.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return A pointer to the output destination.
- **/
- InsertDestination* output_destination() {
- return output_destination_;
- }
-
void execute() override;
private:
WindowAggregationOperationState *state_;
+ const std::vector<block_id> block_ids_;
InsertDestination *output_destination_;
DISALLOW_COPY_AND_ASSIGN(WindowAggregationWorkOrder);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 69dee1b..076735f 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -249,6 +249,7 @@ message WindowAggregationWorkOrder {
extend WorkOrder {
// All required
optional uint32 window_aggr_state_index = 336;
- optional int32 insert_destination_index = 337;
+ repeated fixed64 block_ids = 337;
+ optional int32 insert_destination_index = 338;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 9df66e1..582effd 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -1053,19 +1053,26 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState
quickstep_catalog_CatalogTypedefs
quickstep_expressions_ExpressionFactories
quickstep_expressions_Expressions_proto
- quickstep_expressions_aggregation_AggregateFunction
- quickstep_expressions_aggregation_AggregateFunctionFactory
- quickstep_expressions_aggregation_AggregationHandle
- quickstep_expressions_aggregation_AggregationID
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_storage_InsertDestination
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
+ quickstep_storage_SubBlocksReference
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
quickstep_storage_WindowAggregationOperationState_proto
+ quickstep_types_containers_ColumnVector
+ quickstep_types_containers_ColumnVectorUtil
+ quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
- quickstep_expressions_aggregation_AggregateFunction_proto
quickstep_expressions_Expressions_proto
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
${PROTOBUF_LIBRARY})
# Module all-in-one library:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index a0bcc37..0cdfc1a 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -31,14 +31,21 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/ExpressionFactories.hpp"
#include "expressions/Expressions.pb.h"
-#include "expressions/aggregation/AggregateFunction.hpp"
-#include "expressions/aggregation/AggregateFunctionFactory.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/InsertDestination.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
#include "storage/WindowAggregationOperationState.pb.h"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/containers/ColumnVectorUtil.hpp"
#include "glog/logging.h"
@@ -46,23 +53,21 @@ namespace quickstep {
WindowAggregationOperationState::WindowAggregationOperationState(
const CatalogRelationSchema &input_relation,
- const AggregateFunction *window_aggregate_function,
+ const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
- std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
StorageManager *storage_manager)
: input_relation_(input_relation),
arguments_(std::move(arguments)),
- partition_by_attributes_(std::move(partition_by_attributes)),
is_row_(is_row),
num_preceding_(num_preceding),
num_following_(num_following),
storage_manager_(storage_manager) {
// Get the Types of this window aggregate's arguments so that we can create an
// AggregationHandle.
- // TODO(Shixuan): Next step: New handles for window aggregation function.
std::vector<const Type*> argument_types;
for (const std::unique_ptr<const Scalar> &argument : arguments_) {
argument_types.emplace_back(&argument->getType());
@@ -71,28 +76,18 @@ WindowAggregationOperationState::WindowAggregationOperationState(
// Check if window aggregate function could apply to the arguments.
DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
+ // IDs and types of partition keys.
+ std::vector<const Type*> partition_by_types;
+ for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
+ partition_by_ids_.push_back(
+ partition_by_attribute->getAttributeIdForValueAccessor());
+ partition_by_types.push_back(&partition_by_attribute->getType());
+ }
+
// Create the handle and initial state.
window_aggregation_handle_.reset(
- window_aggregate_function->createHandle(argument_types));
- window_aggregation_state_.reset(
- window_aggregation_handle_->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // See if all of this window aggregate's arguments are attributes in the input
- // relation. If so, remember the attribute IDs so that we can do copy elision
- // when actually performing the window aggregation.
- arguments_as_attributes_.reserve(arguments_.size());
- for (const std::unique_ptr<const Scalar> &argument : arguments_) {
- const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
- if (argument_id == -1) {
- arguments_as_attributes_.clear();
- break;
- } else {
- DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
- arguments_as_attributes_.push_back(argument_id);
- }
- }
-#endif
+ window_aggregate_function->createHandle(std::move(argument_types),
+ std::move(partition_by_types)));
}
WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
@@ -101,10 +96,6 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
StorageManager *storage_manager) {
DCHECK(ProtoIsValid(proto, database));
- // Rebuild contructor arguments from their representation in 'proto'.
- const AggregateFunction *aggregate_function
- = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
-
std::vector<std::unique_ptr<const Scalar>> arguments;
arguments.reserve(proto.arguments_size());
for (int argument_idx = 0; argument_idx < proto.arguments_size(); ++argument_idx) {
@@ -126,10 +117,10 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
const std::int64_t num_preceding = proto.num_preceding();
const std::int64_t num_following = proto.num_following();
- return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
- aggregate_function,
+ return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+ &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()),
std::move(arguments),
- std::move(partition_by_attributes),
+ partition_by_attributes,
is_row,
num_preceding,
num_following,
@@ -139,11 +130,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
const CatalogDatabaseLite &database) {
if (!proto.IsInitialized() ||
- !database.hasRelationWithId(proto.relation_id())) {
+ !database.hasRelationWithId(proto.input_relation_id())) {
return false;
}
- if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+ if (!WindowAggregateFunctionFactory::ProtoIsValid(proto.function())) {
return false;
}
@@ -176,4 +167,122 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
return true;
}
+void WindowAggregationOperationState::windowAggregateBlocks(
+ InsertDestination *output_destination,
+ const std::vector<block_id> &block_ids) {
+ // TODO(Shixuan): This is a quick fix for currently unsupported functions in
+ // order to pass the query_optimizer test.
+ if (window_aggregation_handle_.get() == nullptr) {
+ std::cout << "The function will be supported in the near future :)\n";
+ return;
+ }
+
+ // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar
+ // work. This will need Order Key to be passed so that we know where the
+ // window should start and end.
+ if (!is_row_) {
+ std::cout << "Currently we don't support RANGE frame mode :(\n";
+ return;
+ }
+
+ // Get the total number of tuples.
+ int num_tuples = 0;
+ for (const block_id block_idx : block_ids) {
+ num_tuples +=
+ storage_manager_->getBlock(block_idx, input_relation_)->getNumTuples();
+ }
+
+ // Construct column vectors for attributes.
+ std::vector<ColumnVector*> attribute_vecs;
+ for (std::size_t attr_id = 0; attr_id < input_relation_.size(); ++attr_id) {
+ const CatalogAttribute *attr = input_relation_.getAttributeById(attr_id);
+ const Type &type = attr->getType();
+
+ if (NativeColumnVector::UsableForType(type)) {
+ attribute_vecs.push_back(new NativeColumnVector(type, num_tuples));
+ } else {
+ attribute_vecs.push_back(new IndirectColumnVector(type, num_tuples));
+ }
+ }
+
+ // Construct column vectors for arguments.
+ std::vector<ColumnVector*> argument_vecs;
+ for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+ const Type &type = argument->getType();
+
+ if (NativeColumnVector::UsableForType(type)) {
+ argument_vecs.push_back(new NativeColumnVector(type, num_tuples));
+ } else {
+ argument_vecs.push_back(new IndirectColumnVector(type, num_tuples));
+ }
+ }
+
+ // TODO(Shixuan): Add Support for Vector Copy Elision Selection.
+ // Add tuples and arguments into ColumnVectors.
+ for (const block_id block_idx : block_ids) {
+ BlockReference block = storage_manager_->getBlock(block_idx, input_relation_);
+ const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+ SubBlocksReference sub_block_ref(tuple_block,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
+ ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
+ for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+ argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
+ &sub_block_ref));
+ }
+
+ InvokeOnAnyValueAccessor(tuple_accessor,
+ [&] (auto *tuple_accessor) -> void { // NOLINT(build/c++11)
+ tuple_accessor->beginIteration();
+ argument_accessor->beginIteration();
+
+ while (tuple_accessor->next() && argument_accessor->next()) {
+ for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) {
+ ColumnVector *attr_vec = attribute_vecs[attr_id];
+ if (attr_vec->isNative()) {
+ static_cast<NativeColumnVector*>(attr_vec)->appendTypedValue(
+ tuple_accessor->getTypedValue(attr_id));
+ } else {
+ static_cast<IndirectColumnVector*>(attr_vec)->appendTypedValue(
+ tuple_accessor->getTypedValue(attr_id));
+ }
+ }
+
+ for (std::size_t argument_idx = 0;
+ argument_idx < argument_vecs.size();
+ ++argument_idx) {
+ ColumnVector *argument = argument_vecs[argument_idx];
+ if (argument->isNative()) {
+ static_cast<NativeColumnVector*>(argument)->appendTypedValue(
+ argument_accessor->getTypedValue(argument_idx));
+ } else {
+ static_cast<IndirectColumnVector*>(argument)->appendTypedValue(
+ argument_accessor->getTypedValue(argument_idx));
+ }
+ }
+ }
+ });
+ }
+
+ // Construct the value accessor for tuples in all blocks
+ ColumnVectorsValueAccessor *all_blocks_accessor
+ = new ColumnVectorsValueAccessor();
+ for (ColumnVector *attr_vec : attribute_vecs) {
+ all_blocks_accessor->addColumn(attr_vec);
+ }
+
+ // Do actual calculation in handle.
+ ColumnVector *window_aggregates =
+ window_aggregation_handle_->calculate(all_blocks_accessor,
+ std::move(argument_vecs),
+ partition_by_ids_,
+ is_row_,
+ num_preceding_,
+ num_following_);
+
+ all_blocks_accessor->addColumn(window_aggregates);
+ output_destination->bulkInsertTuples(all_blocks_accessor);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index d7b3e6a..9792a99 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -25,20 +25,20 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/WindowAggregationOperationState.pb.h"
#include "utility/Macros.hpp"
namespace quickstep {
-class AggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class InsertDestination;
class StorageManager;
+class WindowAggregateFunction;
/** \addtogroup Storage
* @{
@@ -63,13 +63,12 @@ class WindowAggregationOperationState {
* current row. -1 means UNBOUNDED PRECEDING.
* @param num_following The number of rows/range for the tuples following the
* current row. -1 means UNBOUNDED FOLLOWING.
- * @param storage_manager The StorageManager to use for allocating hash
- * tables.
+ * @param storage_manager The StorageManager to get block references.
*/
WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
- const AggregateFunction *window_aggregate_function,
+ const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
- std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
@@ -107,66 +106,29 @@ class WindowAggregationOperationState {
const CatalogDatabaseLite &database);
/**
- * @brief Get the is_row info.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return True if the frame mode is ROW, false if it is RANGE.
- **/
- const bool is_row() const { return is_row_; }
-
- /**
- * @brief Get the num_preceding info.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return The number of rows/range that precedes the current row.
- **/
- const std::int64_t num_preceding() const { return num_preceding_; }
-
- /**
- * @brief Get the num_following info.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
+ * @brief Compute window aggregates on the tuples of the given relation.
*
- * @return The number of rows/range that follows the current row.
+ * @param output_destination The output destination for the computed window
+ * aggregate.
+ * @param block_ids The id of the blocks to be computed.
**/
- const std::int64_t num_following() const { return num_following_; }
-
- /**
- * @brief Get the pointer to StorageManager.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return A pointer to the storage manager.
- **/
- StorageManager *storage_manager() { return storage_manager_; }
+ void windowAggregateBlocks(InsertDestination *output_destination,
+ const std::vector<block_id> &block_ids);
private:
const CatalogRelationSchema &input_relation_;
-
- // TODO(Shixuan): Handle and State for window aggregation will be needed for
- // actual calculation.
- std::unique_ptr<AggregationHandle> window_aggregation_handle_;
- std::unique_ptr<AggregationState> window_aggregation_state_;
+ const std::vector<block_id> block_ids_;
+ std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
std::vector<std::unique_ptr<const Scalar>> arguments_;
+ std::vector<attribute_id> partition_by_ids_;
- // We don't add order_by_attributes here since it is not needed after sorting.
- std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
-
- // Window framing information.
+ // Frame info.
const bool is_row_;
const std::int64_t num_preceding_;
const std::int64_t num_following_;
StorageManager *storage_manager_;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all an aggregate's argument expressions are simply attributes in
- // 'input_relation_', then this caches the attribute IDs of those arguments.
- std::vector<attribute_id> arguments_as_attributes_;
-#endif
-
DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index c7bd0ef..d888461 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -19,12 +19,12 @@ syntax = "proto2";
package quickstep.serialization;
-import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/window_aggregation/WindowAggregateFunction.proto";
import "expressions/Expressions.proto";
message WindowAggregationOperationState {
- required int32 relation_id = 1;
- required AggregateFunction function = 2;
+ required int32 input_relation_id = 1;
+ required WindowAggregateFunction function = 2;
repeated Scalar arguments = 3;
repeated Scalar partition_by_attributes = 4;
required bool is_row = 5;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/tests/WindowAggregationOperationState_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/WindowAggregationOperationState_unittest.cpp b/storage/tests/WindowAggregationOperationState_unittest.cpp
index c572034..d58f0f5 100644
--- a/storage/tests/WindowAggregationOperationState_unittest.cpp
+++ b/storage/tests/WindowAggregationOperationState_unittest.cpp
@@ -23,7 +23,7 @@
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
#include "storage/WindowAggregationOperationState.hpp"
#include "storage/WindowAggregationOperationState.pb.h"
@@ -57,8 +57,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, UninitializationTest) {
TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
serialization::WindowAggregationOperationState proto;
- proto.set_relation_id(kInvalidTableId);
- proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_input_relation_id(kInvalidTableId);
+ proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
proto.set_is_row(true);
proto.set_num_preceding(kValidNum);
proto.set_num_following(kValidNum);
@@ -67,8 +67,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
serialization::WindowAggregationOperationState proto;
- proto.set_relation_id(rel_id_);
- proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_input_relation_id(rel_id_);
+ proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
proto.set_is_row(true);
proto.set_num_preceding(kInvalidNum);
proto.set_num_following(kValidNum);
@@ -81,8 +81,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
TEST_F(WindowAggregationOperationStateProtoTest, ValidTest) {
serialization::WindowAggregationOperationState proto;
- proto.set_relation_id(rel_id_);
- proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_input_relation_id(rel_id_);
+ proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
proto.set_is_row(true);
proto.set_num_preceding(kValidNum);
proto.set_num_following(kValidNum);
[4/4] incubator-quickstep git commit: - Supported ROWS mode for AVG
window aggregation. - Created WindowAggregateFunctions in
expressions/window_aggregation. - Created WindowAggregationHandle for AVG to
actually do the calculation. - Other functions will
Posted by zu...@apache.org.
- Supported ROWS mode for AVG window aggregation.
- Created WindowAggregateFunctions in expressions/window_aggregation.
- Created WindowAggregationHandle for AVG to actually do the calculation.
- Other functions will be supported in future PRs.
- RANGE mode is not supported yet.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c0bb4620
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c0bb4620
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c0bb4620
Branch: refs/heads/SQL-window-aggregation
Commit: c0bb4620ca7d3e2ba6334efc009376164d2cd6f8
Parents: 2b78380
Author: shixuan-fan <sh...@apache.org>
Authored: Fri Jul 8 18:23:47 2016 +0000
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Jul 25 13:21:40 2016 -0700
----------------------------------------------------------------------
expressions/CMakeLists.txt | 1 +
expressions/window_aggregation/CMakeLists.txt | 206 ++++++++++
.../WindowAggregateFunction.cpp | 58 +++
.../WindowAggregateFunction.hpp | 149 +++++++
.../WindowAggregateFunction.proto | 32 ++
.../WindowAggregateFunctionAvg.cpp | 85 ++++
.../WindowAggregateFunctionAvg.hpp | 75 ++++
.../WindowAggregateFunctionCount.cpp | 59 +++
.../WindowAggregateFunctionCount.hpp | 75 ++++
.../WindowAggregateFunctionFactory.cpp | 106 +++++
.../WindowAggregateFunctionFactory.hpp | 96 +++++
.../WindowAggregateFunctionMax.cpp | 67 ++++
.../WindowAggregateFunctionMax.hpp | 75 ++++
.../WindowAggregateFunctionMin.cpp | 66 ++++
.../WindowAggregateFunctionMin.hpp | 75 ++++
.../WindowAggregateFunctionSum.cpp | 82 ++++
.../WindowAggregateFunctionSum.hpp | 75 ++++
.../WindowAggregationHandle.hpp | 137 +++++++
.../WindowAggregationHandleAvg.cpp | 241 ++++++++++++
.../WindowAggregationHandleAvg.hpp | 101 +++++
.../window_aggregation/WindowAggregationID.hpp | 44 +++
.../WindowAggregationHandleAvg_unittest.cpp | 387 +++++++++++++++++++
query_optimizer/CMakeLists.txt | 2 +
query_optimizer/ExecutionGenerator.cpp | 5 +-
query_optimizer/expressions/CMakeLists.txt | 2 +-
.../expressions/WindowAggregateFunction.cpp | 4 +-
.../expressions/WindowAggregateFunction.hpp | 10 +-
query_optimizer/resolver/CMakeLists.txt | 2 +
query_optimizer/resolver/Resolver.cpp | 40 +-
query_optimizer/resolver/Resolver.hpp | 5 +-
.../tests/execution_generator/Select.test | 78 +++-
relational_operators/CMakeLists.txt | 1 +
.../WindowAggregationOperator.cpp | 15 +-
.../WindowAggregationOperator.hpp | 30 +-
relational_operators/WorkOrder.proto | 3 +-
storage/CMakeLists.txt | 17 +-
storage/WindowAggregationOperationState.cpp | 183 +++++++--
storage/WindowAggregationOperationState.hpp | 68 +---
storage/WindowAggregationOperationState.proto | 6 +-
...WindowAggregationOperationState_unittest.cpp | 14 +-
40 files changed, 2613 insertions(+), 164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/CMakeLists.txt b/expressions/CMakeLists.txt
index 53ad5d4..6ef3c24 100644
--- a/expressions/CMakeLists.txt
+++ b/expressions/CMakeLists.txt
@@ -17,6 +17,7 @@ add_subdirectory(aggregation)
add_subdirectory(predicate)
add_subdirectory(scalar)
add_subdirectory(table_generator)
+add_subdirectory(window_aggregation)
QS_PROTOBUF_GENERATE_CPP(expressions_Expressions_proto_srcs
expressions_Expressions_proto_hdrs
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt
new file mode 100644
index 0000000..6a16fcc
--- /dev/null
+++ b/expressions/window_aggregation/CMakeLists.txt
@@ -0,0 +1,206 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+QS_PROTOBUF_GENERATE_CPP(expressions_windowaggregation_WindowAggregateFunction_proto_srcs
+ expressions_windowaggregation_WindowAggregateFunction_proto_hdrs
+ WindowAggregateFunction.proto)
+
+# Declare micro-libs:
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction
+ WindowAggregateFunction.cpp
+ WindowAggregateFunction.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ ${expressions_windowaggregation_WindowAggregateFunction_proto_srcs})
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ WindowAggregateFunctionAvg.cpp
+ WindowAggregateFunctionAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionCount
+ WindowAggregateFunctionCount.cpp
+ WindowAggregateFunctionCount.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ WindowAggregateFunctionFactory.cpp
+ WindowAggregateFunctionFactory.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionMax
+ WindowAggregateFunctionMax.cpp
+ WindowAggregateFunctionMax.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionMin
+ WindowAggregateFunctionMin.cpp
+ WindowAggregateFunctionMin.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
+ WindowAggregateFunctionSum.cpp
+ WindowAggregateFunctionSum.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle
+ ../../empty_src.cpp
+ WindowAggregationHandle.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ WindowAggregationHandleAvg.cpp
+ WindowAggregationHandleAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationID
+ ../../empty_src.cpp
+ WindowAggregationID.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_storage_StorageBlockInfo
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ ${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ glog
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_operations_binaryoperations_BinaryOperation
+ quickstep_types_operations_binaryoperations_BinaryOperationFactory
+ quickstep_types_operations_binaryoperations_BinaryOperationID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionCount
+ glog
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ glog
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionCount
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionMax
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionMin
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionMax
+ glog
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_types_Type
+ quickstep_types_operations_comparisons_Comparison
+ quickstep_types_operations_comparisons_ComparisonFactory
+ quickstep_types_operations_comparisons_ComparisonID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionMin
+ glog
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_types_Type
+ quickstep_types_operations_comparisons_Comparison
+ quickstep_types_operations_comparisons_ComparisonFactory
+ quickstep_types_operations_comparisons_ComparisonID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
+ glog
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_operations_binaryoperations_BinaryOperation
+ quickstep_types_operations_binaryoperations_BinaryOperationFactory
+ quickstep_types_operations_binaryoperations_BinaryOperationID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandle
+ glog
+ quickstep_catalog_CatalogRelationSchema
+ quickstep_catalog_CatalogTypedefs
+ quickstep_storage_StorageBlockInfo
+ quickstep_types_TypedValue
+ quickstep_types_containers_ColumnVector
+ quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_types_operations_comparisons_Comparison
+ quickstep_types_operations_comparisons_ComparisonFactory
+ quickstep_types_operations_comparisons_ComparisonID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_scalar_Scalar
+ quickstep_expressions_scalar_ScalarAttribute
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_storage_ValueAccessor
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_TypedValue
+ quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_types_operations_binaryoperations_BinaryOperation
+ quickstep_types_operations_binaryoperations_BinaryOperationFactory
+ quickstep_types_operations_binaryoperations_BinaryOperationID
+ quickstep_types_operations_comparisons_Comparison
+ quickstep_utility_Macros)
+
+# Submodule all-in-one library:
+add_library(quickstep_expressions_windowaggregation ../../empty_src.cpp)
+target_link_libraries(quickstep_expressions_windowaggregation
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionCount
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionMax
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionMin
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ quickstep_expressions_windowaggregation_WindowAggregationID)
+
+# Tests:
+
+# Unified executable to ammortize cost of linking.
+add_executable(WindowAggregationHandle_tests
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/WindowAggregationHandleAvg_unittest.cpp")
+target_link_libraries(WindowAggregationHandle_tests
+ gtest
+ gtest_main
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_storage_ValueAccessor
+ quickstep_types_CharType
+ quickstep_types_DateOperatorOverloads
+ quickstep_types_DatetimeIntervalType
+ quickstep_types_DatetimeType
+ quickstep_types_DoubleType
+ quickstep_types_FloatType
+ quickstep_types_IntType
+ quickstep_types_IntervalLit
+ quickstep_types_LongType
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_TypedValue
+ quickstep_types_VarCharType
+ quickstep_types_YearMonthIntervalType
+ quickstep_types_containers_ColumnVector
+ quickstep_types_containers_ColumnVectorsValueAccessor)
+add_test(WindowAggregationHandle_tests WindowAggregationHandle_tests)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.cpp b/expressions/window_aggregation/WindowAggregateFunction.cpp
new file mode 100644
index 0000000..3911e1c
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.cpp
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+serialization::WindowAggregateFunction WindowAggregateFunction::getProto() const {
+ serialization::WindowAggregateFunction proto;
+ switch (win_agg_id_) {
+ case WindowAggregationID::kAvg:
+ proto.set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
+ break;
+ case WindowAggregationID::kCount:
+ proto.set_window_aggregation_id(serialization::WindowAggregateFunction::COUNT);
+ break;
+ case WindowAggregationID::kMax:
+ proto.set_window_aggregation_id(serialization::WindowAggregateFunction::MAX);
+ break;
+ case WindowAggregationID::kMin:
+ proto.set_window_aggregation_id(serialization::WindowAggregateFunction::MIN);
+ break;
+ case WindowAggregationID::kSum:
+ proto.set_window_aggregation_id(serialization::WindowAggregateFunction::SUM);
+ break;
+ default: {
+ LOG(FATAL) << "Unrecognized WindowAggregationID: "
+ << static_cast<std::underlying_type<WindowAggregationID>::type>(win_agg_id_);
+ }
+ }
+
+ return proto;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
new file mode 100644
index 0000000..e40479b
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class CatalogRelationSchema;
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief A class representing a particular window aggregate function in the
+ * abstract sense. Each named aggregate function is represented by a
+ * singleton subclass of WindowAggregateFunction.
+ *
+ * WindowAggregateFunction provides informational methods about the
+ * applicability of a particular window aggregate function to particular
+ * argument Type(s). The actual implementation of the window aggregate
+ * functions' logic is in the WindowAggregationHandle class hierarchy, and can
+ * be different depending on the particular argument Type(s) given to the window
+ * aggregate. To perform a window aggregation, a caller should first call
+ * WindowAggregateFunction::createHandle() to instantiate an
+ * WindowAggregationHandle object, then use the methods of
+ * WindowAggregationHandle to do the actual window aggregation. Finally, delete
+ * the WindowAggregationHandle after finished.
+ *
+ * See WindowAggregationHandle for more detailed information about how
+ * window aggregates are actually computed.
+ **/
+class WindowAggregateFunction {
+ public:
+ /**
+ * @brief Get the ID of this window aggregate (i.e. its unique ID amongst all
+ * the WindowAggregateFunctions).
+ *
+ * @return The WindowAggregationID of this WindowAggregateFunction.
+ **/
+ inline WindowAggregationID getWindowAggregationID() const {
+ return win_agg_id_;
+ }
+
+ /**
+ * @brief Get the human-readable name of this WindowAggregateFunction.
+ *
+ * @return The human-readable name of this WindowAggregateFunction.
+ **/
+ virtual std::string getName() const = 0;
+
+ /**
+ * @brief Get the serialized protocol buffer representation of this
+ * WindowAggregateFunction.
+ *
+ * @return A serialized protocol buffer representation of this
+ * WindowAggregateFunction.
+ **/
+ virtual serialization::WindowAggregateFunction getProto() const;
+
+ /**
+ * @brief Determine if this WindowAggregateFunction can be applied to
+ * arguments of particular Type(s).
+ *
+ * @param argument_types A list of zero or more Types (in order) for
+ * arguments to this WindowAggregateFunction.
+ * @return Whether this WindowAggregateFunction is applicable to the given
+ * argument_types.
+ **/
+ virtual bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const = 0;
+
+ /**
+ * @brief Determine the result Type for this WindowAggregateFunction given
+ * arguments of particular Type(s).
+ *
+ * @param argument_types A list of zero or more Types (in order) for
+ * arguments to this WindowAggregateFunction.
+ * @return The result Type for this WindowAggregateFunction applied to the
+ * specified argument_types, or nullptr if this
+ * WindowAggregateFunction is not applicable to the specified Type(s).
+ **/
+ virtual const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const = 0;
+
+ /**
+ * @brief Create a WindowAggregationHandle to compute aggregates.
+ *
+ * @warning It is an error to call this method for argument_types which this
+ * WindowAggregateFunction can not apply to. For safety, check
+ * canApplyToTypes() first.
+ *
+ * @param argument_types A list of zero or more Types (in order) for
+ * arguments to this WindowAggregateFunction.
+ * @param partition_key_types A list or zero or more Types for partition keys
+ * to this WindowAggregateFunction.
+ *
+ * @return A new WindowAggregationHandle that can be used to compute this
+ * WindowAggregateFunction over the specified argument_types. Caller
+ * is responsible for deleting the returned object.
+ **/
+ virtual WindowAggregationHandle* createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const = 0;
+
+ protected:
+ explicit WindowAggregateFunction(const WindowAggregationID win_agg_id)
+ : win_agg_id_(win_agg_id) {
+ }
+
+ private:
+ const WindowAggregationID win_agg_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunction);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunction.proto
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.proto b/expressions/window_aggregation/WindowAggregateFunction.proto
new file mode 100644
index 0000000..fe8d799
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.proto
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+message WindowAggregateFunction {
+ enum WindowAggregationID {
+ AVG = 0;
+ COUNT = 1;
+ MAX = 2;
+ MIN = 3;
+ SUM = 4;
+ }
+
+ required WindowAggregationID window_aggregation_id = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
new file mode 100644
index 0000000..bc31a53
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionAvg::canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const {
+ // AVG is unary.
+ if (argument_types.size() != 1) {
+ return false;
+ }
+
+ // Argument must be addable and divisible.
+ return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .canApplyToTypes(*argument_types.front(), *argument_types.front()) &&
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .canApplyToTypes(*argument_types.front(), TypeFactory::GetType(kDouble));
+}
+
+const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const {
+ if (!canApplyToTypes(argument_types)) {
+ return nullptr;
+ }
+
+ // The type used to sum values is nullable, and we automatically widen int to
+ // long and float to double to have more headroom when adding up many values.
+ const Type *sum_type = &(argument_types.front()->getNullableVersion());
+ switch (sum_type->getTypeID()) {
+ case kInt:
+ sum_type = &TypeFactory::GetType(kLong, true);
+ break;
+ case kFloat:
+ sum_type = &TypeFactory::GetType(kDouble, true);
+ break;
+ default:
+ break;
+ }
+
+ return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .resultTypeForArgumentTypes(*sum_type, TypeFactory::GetType(kDouble));
+}
+
+WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const {
+ DCHECK(canApplyToTypes(argument_types))
+ << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
+ << " that AVG can not be applied to.";
+
+ return new WindowAggregationHandleAvg(partition_key_types,
+ *argument_types.front());
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
new file mode 100644
index 0000000..32fd9d5
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL AVG() OVER term.
+ **/
+class WindowAggregateFunctionAvg : public WindowAggregateFunction {
+ public:
+ static const WindowAggregateFunctionAvg& Instance() {
+ static WindowAggregateFunctionAvg instance;
+ return instance;
+ }
+
+ std::string getName() const override {
+ return "AVG";
+ }
+
+ bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ WindowAggregationHandle* createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+ WindowAggregateFunctionAvg()
+ : WindowAggregateFunction(WindowAggregationID::kAvg) {
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionAvg);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
new file mode 100644
index 0000000..504e000
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionCount.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionCount::canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const {
+ // COUNT may be nullary (i.e. COUNT(*)) or unary.
+ return argument_types.size() <= 1;
+}
+
+const Type* WindowAggregateFunctionCount::resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const {
+ if (!canApplyToTypes(argument_types)) {
+ return nullptr;
+ }
+
+ return &TypeFactory::GetType(kLong);
+}
+
+WindowAggregationHandle* WindowAggregateFunctionCount::createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const {
+ DCHECK(canApplyToTypes(argument_types))
+ << "Attempted to create a WindowAggregationHandleCount for argument Types "
+ << "that COUNT can not be applied to (> 1 argument).";
+
+ // TODO(Shixuan): Add handle for Count.
+ return nullptr;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
new file mode 100644
index 0000000..1b40fdd
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_COUNT_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_COUNT_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL COUNT() OVER term.
+ **/
+class WindowAggregateFunctionCount : public WindowAggregateFunction {
+ public:
+ static const WindowAggregateFunctionCount& Instance() {
+ static WindowAggregateFunctionCount instance;
+ return instance;
+ }
+
+ std::string getName() const override {
+ return "COUNT";
+ }
+
+ bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ WindowAggregationHandle* createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+ WindowAggregateFunctionCount()
+ : WindowAggregateFunction(WindowAggregationID::kCount) {
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionCount);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_COUNT_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
new file mode 100644
index 0000000..65247f2
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+
+#include <string>
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionCount.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionMax.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionMin.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionSum.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::Get(
+ const WindowAggregationID agg_id) {
+ switch (agg_id) {
+ case WindowAggregationID::kAvg:
+ return WindowAggregateFunctionAvg::Instance();
+ case WindowAggregationID::kCount:
+ return WindowAggregateFunctionCount::Instance();
+ case WindowAggregationID::kMax:
+ return WindowAggregateFunctionMax::Instance();
+ case WindowAggregationID::kMin:
+ return WindowAggregateFunctionMin::Instance();
+ case WindowAggregationID::kSum:
+ return WindowAggregateFunctionSum::Instance();
+ default: {
+ LOG(FATAL) << "Unrecognized WindowAggregationID: "
+ << static_cast<std::underlying_type<WindowAggregationID>::type>(agg_id);
+ }
+ }
+}
+
+const WindowAggregateFunction* WindowAggregateFunctionFactory::GetByName(
+ const std::string &name) {
+ if (name == "avg") {
+ return &WindowAggregateFunctionAvg::Instance();
+ } else if (name == "count") {
+ return &WindowAggregateFunctionCount::Instance();
+ } else if (name == "max") {
+ return &WindowAggregateFunctionMax::Instance();
+ } else if (name == "min") {
+ return &WindowAggregateFunctionMin::Instance();
+ } else if (name == "sum") {
+ return &WindowAggregateFunctionSum::Instance();
+ } else {
+ return nullptr;
+ }
+}
+
+bool WindowAggregateFunctionFactory::ProtoIsValid(
+ const serialization::WindowAggregateFunction &proto) {
+ return proto.IsInitialized() &&
+ serialization::WindowAggregateFunction::WindowAggregationID_IsValid(proto.window_aggregation_id());
+}
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::ReconstructFromProto(
+ const serialization::WindowAggregateFunction &proto) {
+ DCHECK(ProtoIsValid(proto))
+ << "Attempted to reconstruct an WindowAggregateFunction from an invalid proto:\n"
+ << proto.DebugString();
+
+ switch (proto.window_aggregation_id()) {
+ case serialization::WindowAggregateFunction::AVG:
+ return WindowAggregateFunctionAvg::Instance();
+ case serialization::WindowAggregateFunction::COUNT:
+ return WindowAggregateFunctionCount::Instance();
+ case serialization::WindowAggregateFunction::MAX:
+ return WindowAggregateFunctionMax::Instance();
+ case serialization::WindowAggregateFunction::MIN:
+ return WindowAggregateFunctionMin::Instance();
+ case serialization::WindowAggregateFunction::SUM:
+ return WindowAggregateFunctionSum::Instance();
+ default: {
+ LOG(FATAL) << "Unrecognized serialization::WindowAggregateFunction::WindowAggregationID: "
+ << proto.window_aggregation_id()
+ << "\nFull proto debug string:\n"
+ << proto.DebugString();
+ }
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
new file mode 100644
index 0000000..1d59e93
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+
+#include <string>
+
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregateFunction;
+namespace serialization { class WindowAggregateFunction; }
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief All-static factory with methods that provide access to the various
+ * concrete implementations of WindowAggregateFunction.
+ *
+ * WindowAggregateFunctionFactory allows client code to use any
+ * WindowAggregateFunction in Quickstep in a generic way without having to know
+ * about all the specific subclasses of WindowAggregateFunction. In particular,
+ * it is used to deserialize WindowAggregateFunctions used in
+ * WindowAggregationOperationState from their protobuf representations
+ * (originally created by the optimizer) when deserializing a QueryContext.
+ **/
+namespace WindowAggregateFunctionFactory {
+ /**
+ * @brief Get a particular WindowAggregateFunction by its ID.
+ *
+ * @param agg_id The ID of the desired WindowAggregateFunction.
+ * @return A reference to the singleton instance of the
+ * WindowAggregateFunction specified by agg_id.
+ **/
+ const WindowAggregateFunction& Get(const WindowAggregationID agg_id);
+
+ /**
+ * @brief Get a particular WindowAggregateFunction by its name in SQL syntax.
+ *
+ * @param name The name of the desired WindowAggregateFunction in lower case.
+ * @return A pointer to the WindowAggregateFunction specified by name, or NULL
+ * if name does not match any known WindowAggregateFunction.
+ **/
+ const WindowAggregateFunction* GetByName(const std::string &name);
+
+ /**
+ * @brief Determine if a serialized protobuf representation of a
+ * WindowAggregateFunction is fully-formed and valid.
+ *
+ * @param proto A serialized protobuf representation of a
+ * WindowAggregateFunction to check for validity.
+ * @return Whether proto is fully-formed and valid.
+ **/
+ bool ProtoIsValid(const serialization::WindowAggregateFunction &proto);
+
+ /**
+ * @brief Get the WindowAggregateFunction represented by a proto.
+ *
+ * @warning It is an error to call this method with an invalid proto.
+ * ProtoIsValid() should be called first to check.
+ *
+ * @param proto A serialized protobuf representation of a
+ * WindowAggregateFunction.
+ * @return The WindowAggregateFunction represented by proto.
+ **/
+ const WindowAggregateFunction& ReconstructFromProto(
+ const serialization::WindowAggregateFunction &proto);
+
+} // namespace WindowAggregateFunctionFactory
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
new file mode 100644
index 0000000..f3997c7
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionMax.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonFactory.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionMax::canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const {
+ // MAX is unary.
+ if (argument_types.size() != 1) {
+ return false;
+ }
+
+ // Argument must be comparable by '>'.
+ return ComparisonFactory::GetComparison(ComparisonID::kGreater).canCompareTypes(
+ *argument_types.front(),
+ *argument_types.front());
+}
+
+const Type* WindowAggregateFunctionMax::resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const {
+ if (!canApplyToTypes(argument_types)) {
+ return nullptr;
+ }
+
+ return &(argument_types.front()->getNullableVersion());
+}
+
+WindowAggregationHandle* WindowAggregateFunctionMax::createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const {
+ DCHECK(canApplyToTypes(argument_types))
+ << "Attempted to create a WindowAggregationHandleMax for argument Type(s) "
+ << "that MAX can not be applied to.";
+
+ // TODO(Shixuan): Add handle for Max.
+ return nullptr;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
new file mode 100644
index 0000000..00c788e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MAX_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MAX_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL MAX() OVER term.
+ **/
+class WindowAggregateFunctionMax : public WindowAggregateFunction {
+ public:
+ static const WindowAggregateFunctionMax& Instance() {
+ static WindowAggregateFunctionMax instance;
+ return instance;
+ }
+
+ std::string getName() const override {
+ return "MAX";
+ }
+
+ bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ WindowAggregationHandle* createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+ WindowAggregateFunctionMax()
+ : WindowAggregateFunction(WindowAggregationID::kMax) {
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionMax);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MAX_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
new file mode 100644
index 0000000..a13e28e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionMin.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonFactory.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionMin::canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const {
+ // MIN is unary.
+ if (argument_types.size() != 1) {
+ return false;
+ }
+
+ // Argument must be comparable by '<'.
+ return ComparisonFactory::GetComparison(ComparisonID::kLess).canCompareTypes(
+ *argument_types.front(),
+ *argument_types.front());
+}
+
+const Type* WindowAggregateFunctionMin::resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const {
+ if (!canApplyToTypes(argument_types)) {
+ return nullptr;
+ }
+
+ return &(argument_types.front()->getNullableVersion());
+}
+
+WindowAggregationHandle* WindowAggregateFunctionMin::createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const {
+ DCHECK(canApplyToTypes(argument_types))
+ << "Attempted to create a WindowAggregationHandleMin for argument Type(s) "
+ << "that MIN can not be applied to.";
+
+ return nullptr;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
new file mode 100644
index 0000000..aeba539
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MIN_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MIN_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL MIN() OVER term.
+ **/
+class WindowAggregateFunctionMin : public WindowAggregateFunction {
+ public:
+ static const WindowAggregateFunctionMin& Instance() {
+ static WindowAggregateFunctionMin instance;
+ return instance;
+ }
+
+ std::string getName() const override {
+ return "MIN";
+ }
+
+ bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ WindowAggregationHandle* createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+ WindowAggregateFunctionMin()
+ : WindowAggregateFunction(WindowAggregationID::kMin) {
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionMin);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_MIN_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
new file mode 100644
index 0000000..636c53a
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionSum.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionSum::canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const {
+ // SUM is unary.
+ if (argument_types.size() != 1) {
+ return false;
+ }
+
+ // Argument must be addable.
+ return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .canApplyToTypes(*argument_types.front(), *argument_types.front());
+}
+
+const Type* WindowAggregateFunctionSum::resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const {
+ if (!canApplyToTypes(argument_types)) {
+ return nullptr;
+ }
+
+ // SUM may return NULL if there are no input rows, and we automatically widen
+ // int to long and float to double to have more headroom when adding up many
+ // values.
+ const Type *sum_type = &(argument_types.front()->getNullableVersion());
+ switch (sum_type->getTypeID()) {
+ case kInt:
+ sum_type = &TypeFactory::GetType(kLong, true);
+ break;
+ case kFloat:
+ sum_type = &TypeFactory::GetType(kDouble, true);
+ break;
+ default:
+ break;
+ }
+
+ return sum_type;
+}
+
+WindowAggregationHandle* WindowAggregateFunctionSum::createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const {
+ DCHECK(canApplyToTypes(argument_types))
+ << "Attempted to create a WindowAggregationHandleSum for argument Type(s) "
+ << "that SUM can not be applied to.";
+
+ return nullptr;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
new file mode 100644
index 0000000..047113c
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_SUM_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_SUM_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class Type;
+class WindowAggregationHandle;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL SUM() OVER term.
+ **/
+class WindowAggregateFunctionSum : public WindowAggregateFunction {
+ public:
+ static const WindowAggregateFunctionSum& Instance() {
+ static WindowAggregateFunctionSum instance;
+ return instance;
+ }
+
+ std::string getName() const override {
+ return "SUM";
+ }
+
+ bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ WindowAggregationHandle* createHandle(
+ const std::vector<const Type*> &argument_types,
+ const std::vector<const Type*> &partition_key_types) const override;
+
+ private:
+ WindowAggregateFunctionSum()
+ : WindowAggregateFunction(WindowAggregationID::kSum) {
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionSum);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_SUM_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
new file mode 100644
index 0000000..65f95d9
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonFactory.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class InsertDestinationInterface;
+class Scalar;
+class StorageManager;
+class Type;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregationHandle encapsulates logic for actually computing
+ * window aggregates with particular argument(s).
+ * @note See also WindowAggregateFunction, which represents a SQL aggregate
+ * function in the abstract sense.
+ *
+ * A WindowAggregationHandle is created by calling
+ * WindowAggregateFunction::createHandle(). The WindowAggregationHandle object
+ * provides methods that are used to actually compute the window aggregate,
+ * storing intermediate results in WindowAggregationState objects.
+ *
+ * The work flow for computing a window aggregate is:
+ * 1. Create an initial state by createInitialState().
+ * 2. One thread will handle all the computation, iterating from the first
+ * tuple to the last tuple. Note there will be two modes that could be
+ * used upon different situations:
+ * a. If the window aggregate is defined as accumulative, which are:
+ * i. Functions applied to whole partition, such as rank(), ntile()
+ * and dense_rank().
+ * ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING
+ * AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED
+ * FOLLOWING".
+ * Then, for functions except median, we could store some global
+ * values in the state without keeping all the tuple values around.
+ * b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND
+ * 3 FOLLOWING", we have to store all the tuples in the state so that
+ * we could know which values should be dropped as the window slides.
+ * For each computed value, generate a tuple store in the column vector.
+ * 3. Insert the new column into the original relation and return.
+ *
+ * TODO(Shixuan): Currently we don't support parallelization. The basic idea for
+ * parallelization is to calculate the partial result inside each block. Each
+ * block could visit the following blocks as long as the block's last partition
+ * is not finished. WindowAggregationOperationState will be used for handling
+ * the global state of the calculation.
+ **/
+
+class WindowAggregationHandle {
+ public:
+ /**
+ * @brief Destructor.
+ **/
+ virtual ~WindowAggregationHandle() {}
+
+ /**
+ * @brief Calculate the window aggregate result.
+ *
+ * @param block_accessors A pointer to the value accessor of block attributes.
+ * @param arguments The ColumnVectors of arguments
+ * @param partition_by_ids The ids of partition keys.
+ * @param is_row True if the frame mode is ROWS, false if it is RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ *
+ * @return A ColumnVector of the calculated window aggregates.
+ **/
+ virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
+ std::vector<ColumnVector*> &&arguments,
+ const std::vector<attribute_id> &partition_by_ids,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const = 0;
+
+ protected:
+ /**
+ * @brief Constructor.
+ *
+ * @param partition_key_types The Types of the partition key.
+ **/
+ explicit WindowAggregationHandle(
+ const std::vector<const Type*> &partition_key_types) {
+ // Comparison operators for checking if two tuples belong to the same partition.
+ for (const Type *partition_key_type : partition_key_types) {
+ equal_comparators_.emplace_back(
+ ComparisonFactory::GetComparison(ComparisonID::kEqual)
+ .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
+ }
+ }
+
+ std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
new file mode 100644
index 0000000..a6a10d4
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationHandleAvg::WindowAggregationHandleAvg(
+ const std::vector<const Type*> &partition_key_types,
+ const Type &type)
+ : WindowAggregationHandle(partition_key_types),
+ argument_type_(type) {
+ // We sum Int as Long and Float as Double so that we have more headroom when
+ // adding many values.
+ TypeID type_id;
+ switch (type.getTypeID()) {
+ case kInt:
+ case kLong:
+ type_id = kLong;
+ break;
+ case kFloat:
+ case kDouble:
+ type_id = kDouble;
+ break;
+ default:
+ type_id = type.getTypeID();
+ break;
+ }
+
+ sum_type_ = &(TypeFactory::GetType(type_id));
+
+ // Result is nullable, because AVG() over 0 values (or all NULL values) is
+ // NULL.
+ result_type_
+ = &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .resultTypeForArgumentTypes(*sum_type_, TypeFactory::GetType(kDouble))
+ ->getNullableVersion());
+
+ // Make operators to do arithmetic:
+ // Add operator for summing argument values.
+ fast_add_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+ // Divide operator for dividing sum by count to get final average.
+ divide_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, TypeFactory::GetType(kDouble)));
+}
+
+ColumnVector* WindowAggregationHandleAvg::calculate(
+ ColumnVectorsValueAccessor *tuple_accessor,
+ std::vector<ColumnVector*> &&arguments,
+ const std::vector<attribute_id> &partition_by_ids,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
+ DCHECK_EQ(1u, arguments.size());
+ DCHECK(arguments[0]->isNative());
+ DCHECK_EQ(static_cast<std::size_t>(tuple_accessor->getNumTuples()),
+ static_cast<const NativeColumnVector*>(arguments[0])->size());
+
+ // Initialize the output column and argument accessor.
+ NativeColumnVector *window_aggregates =
+ new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples());
+ ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor();
+ argument_accessor->addColumn(arguments[0]);
+
+ // Create a window for each tuple and calculate the window aggregate.
+ tuple_accessor->beginIteration();
+ argument_accessor->beginIteration();
+
+ while (tuple_accessor->next() && argument_accessor->next()) {
+ const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor,
+ argument_accessor,
+ partition_by_ids,
+ is_row,
+ num_preceding,
+ num_following);
+ window_aggregates->appendTypedValue(window_aggregate);
+ }
+
+ return window_aggregates;
+}
+
+TypedValue WindowAggregationHandleAvg::calculateOneWindow(
+ ColumnVectorsValueAccessor *tuple_accessor,
+ ColumnVectorsValueAccessor *argument_accessor,
+ const std::vector<attribute_id> &partition_by_ids,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
+ // Initialize.
+ TypedValue sum = sum_type_->makeZeroValue();
+ TypedValue current_value = argument_accessor->getTypedValue(0);
+ std::uint64_t count = 0;
+
+ // Ignore the value if null.
+ if (!current_value.isNull()) {
+ sum = fast_add_operator_->applyToTypedValues(sum, current_value);
+ count++;
+ }
+
+ // Get the partition key for the current row.
+ std::vector<TypedValue> current_row_partition_key;
+ for (attribute_id partition_by_id : partition_by_ids) {
+ current_row_partition_key.push_back(
+ tuple_accessor->getTypedValue(partition_by_id));
+ }
+
+ // Get current position.
+ tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
+
+ // Find preceding tuples.
+ int count_preceding = 0;
+ tuple_id preceding_tuple_id = current_tuple_id;
+ while (num_preceding == -1 || count_preceding < num_preceding) {
+ preceding_tuple_id--;
+
+ // No more preceding tuples.
+ if (preceding_tuple_id < 0) {
+ break;
+ }
+
+ // Get the partition keys and compare. If not the same partition as the
+ // current row, stop searching preceding tuples.
+ if (!samePartition(tuple_accessor,
+ current_row_partition_key,
+ preceding_tuple_id,
+ partition_by_ids)) {
+ break;
+ }
+
+ // Actually count the element and do the calculation.
+ count_preceding++;
+ TypedValue preceding_value =
+ argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id);
+
+ // Ignore the value if null.
+ if (!preceding_value.isNull()) {
+ sum = fast_add_operator_->applyToTypedValues(sum, preceding_value);
+ count++;
+ }
+ }
+
+ // Find following tuples.
+ int count_following = 0;
+ tuple_id following_tuple_id = current_tuple_id;
+ while (num_following == -1 || count_following < num_following) {
+ following_tuple_id++;
+
+ // No more following tuples.
+ if (following_tuple_id == tuple_accessor->getNumTuples()) {
+ break;
+ }
+
+ // Get the partition keys and compare. If not the same partition as the
+ // current row, stop searching preceding tuples.
+ if (!samePartition(tuple_accessor,
+ current_row_partition_key,
+ following_tuple_id,
+ partition_by_ids)) {
+ break;
+ }
+
+ // Actually count the element and do the calculation.
+ count_following++;
+ TypedValue following_value =
+ argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id);
+
+ // Ignore the value if null.
+ if (!following_value.isNull()) {
+ sum = fast_add_operator_->applyToTypedValues(sum, following_value);
+ count++;
+ }
+ }
+
+ // If all values are NULLs, return NULL; Otherwise, return the quotient.
+ if (count == 0) {
+ return result_type_->makeNullValue();
+ } else {
+ return divide_operator_->applyToTypedValues(sum,
+ TypedValue(static_cast<double>(count)));
+ }
+}
+
+bool WindowAggregationHandleAvg::samePartition(
+ const ColumnVectorsValueAccessor *tuple_accessor,
+ const std::vector<TypedValue> ¤t_row_partition_key,
+ const tuple_id boundary_tuple_id,
+ const std::vector<attribute_id> &partition_by_ids) const {
+ for (std::size_t partition_by_index = 0;
+ partition_by_index < partition_by_ids.size();
+ ++partition_by_index) {
+ if (!equal_comparators_[partition_by_index]->compareTypedValues(
+ current_row_partition_key[partition_by_index],
+ tuple_accessor->getTypedValueAtAbsolutePosition(
+ partition_by_ids[partition_by_index], boundary_tuple_id))) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
new file mode 100644
index 0000000..5b41779
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <queue>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVector;
+class ColumnVectorsValueAccessor;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief A WindowAggregationHandle for average.
+ **/
+class WindowAggregationHandleAvg : public WindowAggregationHandle {
+ public:
+ ~WindowAggregationHandleAvg() override {}
+
+ ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
+ std::vector<ColumnVector*> &&arguments,
+ const std::vector<attribute_id> &partition_by_ids,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
+
+ private:
+ friend class WindowAggregateFunctionAvg;
+
+ /**
+ * @brief Constructor.
+ *
+ * @param partition_key_types The Types of the partition key.
+ * @param type Type of the avg value.
+ **/
+ WindowAggregationHandleAvg(const std::vector<const Type*> &partition_key_types,
+ const Type &type);
+
+ TypedValue calculateOneWindow(
+ ColumnVectorsValueAccessor *tuple_accessor,
+ ColumnVectorsValueAccessor *argument_accessor,
+ const std::vector<attribute_id> &partition_by_ids,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const;
+
+ bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
+ const std::vector<TypedValue> ¤t_row_partition_key,
+ const tuple_id boundary_tuple_id,
+ const std::vector<attribute_id> &partition_by_ids) const;
+
+ const Type &argument_type_;
+ const Type *sum_type_;
+ const Type *result_type_;
+ std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+ std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/WindowAggregationID.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationID.hpp b/expressions/window_aggregation/WindowAggregationID.hpp
new file mode 100644
index 0000000..8122df3
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationID.hpp
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+
+namespace quickstep {
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief The possible types of window aggregations.
+ **/
+enum class WindowAggregationID {
+ kAvg,
+ kCount,
+ kMin,
+ kMax,
+ kSum
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
[2/4] incubator-quickstep git commit: Introduced PolicyEnforcerBase
and its single-node implementation.
Posted by zu...@apache.org.
Introduced PolicyEnforcerBase and its single-node implementation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2b78380e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2b78380e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2b78380e
Branch: refs/heads/SQL-window-aggregation
Commit: 2b78380ec5689658c909ba63241b78f72f4644bd
Parents: cdecf0b
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Jul 18 12:18:04 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Wed Jul 20 09:12:13 2016 -0700
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 23 +-
query_execution/ForemanSingleNode.cpp | 6 +-
query_execution/ForemanSingleNode.hpp | 7 +-
query_execution/PolicyEnforcer.cpp | 254 ----------------------
query_execution/PolicyEnforcer.hpp | 223 -------------------
query_execution/PolicyEnforcerBase.cpp | 177 +++++++++++++++
query_execution/PolicyEnforcerBase.hpp | 196 +++++++++++++++++
query_execution/PolicyEnforcerSingleNode.cpp | 110 ++++++++++
query_execution/PolicyEnforcerSingleNode.hpp | 104 +++++++++
9 files changed, 612 insertions(+), 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 8c12a5d..f582ba5 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -34,7 +34,8 @@ if (ENABLE_DISTRIBUTED)
endif()
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
-add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
${queryexecution_QueryContext_proto_srcs}
@@ -85,7 +86,7 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
glog
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanBase
- quickstep_queryexecution_PolicyEnforcer
+ quickstep_queryexecution_PolicyEnforcerSingleNode
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_WorkerDirectory
@@ -95,22 +96,29 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
quickstep_utility_Macros
tmb
${GFLAGS_LIB_NAME})
-target_link_libraries(quickstep_queryexecution_PolicyEnforcer
+target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
glog
quickstep_catalog_CatalogDatabase
quickstep_catalog_CatalogRelation
- quickstep_catalog_CatalogTypedefs
quickstep_catalog_PartitionScheme
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionState
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryManagerBase
+ quickstep_relationaloperators_WorkOrder
+ quickstep_storage_StorageBlockInfo
+ quickstep_utility_Macros
+ tmb)
+target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_PolicyEnforcerBase
+ quickstep_queryexecution_QueryExecutionState
+ quickstep_queryexecution_QueryManagerBase
quickstep_queryexecution_QueryManagerSingleNode
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
quickstep_queryoptimizer_QueryHandle
- quickstep_relationaloperators_WorkOrder
- quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
tmb
${GFLAGS_LIB_NAME})
@@ -244,7 +252,8 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanBase
quickstep_queryexecution_ForemanSingleNode
- quickstep_queryexecution_PolicyEnforcer
+ quickstep_queryexecution_PolicyEnforcerBase
+ quickstep_queryexecution_PolicyEnforcerSingleNode
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryexecution_QueryExecutionMessages_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 3aa1f0b..cda02a7 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -25,6 +25,7 @@
#include <vector>
#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/WorkerDirectory.hpp"
@@ -36,6 +37,7 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
@@ -46,6 +48,8 @@ using std::vector;
namespace quickstep {
+class QueryHandle;
+
DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
"of pending work orders for the worker. This information is used "
"by the Foreman to assign work orders to worker threads");
@@ -88,7 +92,7 @@ ForemanSingleNode::ForemanSingleNode(
bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
}
- policy_enforcer_.reset(new PolicyEnforcer(
+ policy_enforcer_.reset(new PolicyEnforcerSingleNode(
foreman_client_id_,
num_numa_nodes,
catalog_database_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
index 7506d35..caef5e0 100644
--- a/query_execution/ForemanSingleNode.hpp
+++ b/query_execution/ForemanSingleNode.hpp
@@ -24,11 +24,12 @@
#include <vector>
#include "query_execution/ForemanBase.hpp"
-#include "query_execution/PolicyEnforcer.hpp"
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
+
+namespace tmb { class MessageBus; }
namespace quickstep {
@@ -128,7 +129,7 @@ class ForemanSingleNode final : public ForemanBase {
CatalogDatabaseLite *catalog_database_;
StorageManager *storage_manager_;
- std::unique_ptr<PolicyEnforcer> policy_enforcer_;
+ std::unique_ptr<PolicyEnforcerSingleNode> policy_enforcer_;
DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
deleted file mode 100644
index 4cba8c5..0000000
--- a/query_execution/PolicyEnforcer.cpp
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- * University of Wisconsin\u2014Madison.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#include "query_execution/PolicyEnforcer.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <utility>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "catalog/PartitionScheme.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/QueryManagerSingleNode.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-namespace quickstep {
-
-DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
- " can be allocated in a single round of dispatch of messages to"
- " the workers.");
-
-bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
- if (admitted_queries_.size() < kMaxConcurrentQueries) {
- // Ok to admit the query.
- const std::size_t query_id = query_handle->query_id();
- if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
- // Query with the same ID not present, ok to admit.
- admitted_queries_[query_id].reset(
- new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
- catalog_database_, storage_manager_, bus_));
- return true;
- } else {
- LOG(ERROR) << "Query with the same ID " << query_id << " exists";
- return false;
- }
- } else {
- // This query will have to wait.
- waiting_queries_.push(query_handle);
- return false;
- }
-}
-
-void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
- std::size_t query_id;
- QueryManagerBase::dag_node_index op_index;
-
- switch (tagged_message.message_type()) {
- case kWorkOrderCompleteMessage: {
- serialization::NormalWorkOrderCompletionMessage proto;
- // Note: This proto message contains the time it took to execute the
- // WorkOrder. It can be accessed in this scope.
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- worker_directory_->decrementNumQueuedWorkOrders(
- proto.worker_thread_index());
- if (profile_individual_workorders_) {
- recordTimeForWorkOrder(proto);
- }
-
- query_id = proto.query_id();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = proto.operator_index();
- admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
- break;
- }
- case kRebuildWorkOrderCompleteMessage: {
- serialization::RebuildWorkOrderCompletionMessage proto;
- // Note: This proto message contains the time it took to execute the
- // rebuild WorkOrder. It can be accessed in this scope.
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- worker_directory_->decrementNumQueuedWorkOrders(
- proto.worker_thread_index());
-
- query_id = proto.query_id();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = proto.operator_index();
- admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
- break;
- }
- case kCatalogRelationNewBlockMessage: {
- serialization::CatalogRelationNewBlockMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- const block_id block = proto.block_id();
-
- CatalogRelation *relation =
- static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
- relation->addBlock(block);
-
- if (proto.has_partition_id()) {
- relation->getPartitionSchemeMutable()->addBlockToPartition(
- proto.partition_id(), block);
- }
- return;
- }
- case kDataPipelineMessage: {
- serialization::DataPipelineMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- query_id = proto.query_id();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = proto.operator_index();
- admitted_queries_[query_id]->processDataPipelineMessage(
- op_index, proto.block_id(), proto.relation_id());
- break;
- }
- case kWorkOrdersAvailableMessage: {
- serialization::WorkOrdersAvailableMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- query_id = proto.query_id();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = proto.operator_index();
-
- // Check if new work orders are available.
- admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
- break;
- }
- case kWorkOrderFeedbackMessage: {
- WorkOrder::FeedbackMessage msg(
- const_cast<void *>(tagged_message.message()),
- tagged_message.message_bytes());
- query_id = msg.header().query_id;
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = msg.header().rel_op_index;
- admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
- break;
- }
- default:
- LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
- }
- if (admitted_queries_[query_id]->queryStatus(op_index) ==
- QueryManagerBase::QueryStatusCode::kQueryExecuted) {
- removeQuery(query_id);
- if (!waiting_queries_.empty()) {
- // Admit the earliest waiting query.
- QueryHandle *new_query = waiting_queries_.front();
- waiting_queries_.pop();
- admitQuery(new_query);
- }
- }
-}
-
-void PolicyEnforcer::getWorkerMessages(
- std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
- // Iterate over admitted queries until either there are no more
- // messages available, or the maximum number of messages have
- // been collected.
- DCHECK(worker_messages->empty());
- // TODO(harshad) - Make this function generic enough so that it
- // works well when multiple queries are getting executed.
- std::size_t per_query_share = 0;
- if (!admitted_queries_.empty()) {
- per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
- } else {
- LOG(WARNING) << "Requesting WorkerMessages when no query is running";
- return;
- }
- DCHECK_GT(per_query_share, 0u);
- std::vector<std::size_t> finished_queries_ids;
-
- for (const auto &admitted_query_info : admitted_queries_) {
- QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
- DCHECK(curr_query_manager != nullptr);
- std::size_t messages_collected_curr_query = 0;
- while (messages_collected_curr_query < per_query_share) {
- WorkerMessage *next_worker_message =
- static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
- if (next_worker_message != nullptr) {
- ++messages_collected_curr_query;
- worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
- } else {
- // No more work ordes from the current query at this time.
- // Check if the query's execution is over.
- if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
- // If the query has been executed, remove it.
- finished_queries_ids.push_back(admitted_query_info.first);
- }
- break;
- }
- }
- }
- for (const std::size_t finished_qid : finished_queries_ids) {
- removeQuery(finished_qid);
- }
-}
-
-void PolicyEnforcer::removeQuery(const std::size_t query_id) {
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
- if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
- LOG(WARNING) << "Removing query with ID " << query_id
- << " that hasn't finished its execution";
- }
- admitted_queries_.erase(query_id);
-}
-
-bool PolicyEnforcer::admitQueries(
- const std::vector<QueryHandle*> &query_handles) {
- for (QueryHandle *curr_query : query_handles) {
- if (!admitQuery(curr_query)) {
- return false;
- }
- }
- return true;
-}
-
-void PolicyEnforcer::recordTimeForWorkOrder(
- const serialization::NormalWorkOrderCompletionMessage &proto) {
- const std::size_t query_id = proto.query_id();
- if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
- workorder_time_recorder_[query_id];
- }
- workorder_time_recorder_[query_id].emplace_back(
- proto.worker_thread_index(),
- proto.operator_index(),
- proto.execution_time_in_microseconds());
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
deleted file mode 100644
index 8bd6d92..0000000
--- a/query_execution/PolicyEnforcer.hpp
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- * University of Wisconsin\u2014Madison.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <tuple>
-#include <unordered_map>
-#include <vector>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class StorageManager;
-class WorkerDirectory;
-
-namespace serialization { class NormalWorkOrderCompletionMessage; }
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A class that ensures that a high level policy is maintained
- * in sharing resources among concurrent queries.
- **/
-class PolicyEnforcer {
- public:
- /**
- * @brief Constructor.
- *
- * @param foreman_client_id The TMB client ID of the Foreman.
- * @param num_numa_nodes Number of NUMA nodes used by the system.
- * @param catalog_database The CatalogDatabase used.
- * @param storage_manager The StorageManager used.
- * @param bus The TMB.
- **/
- PolicyEnforcer(const tmb::client_id foreman_client_id,
- const std::size_t num_numa_nodes,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- WorkerDirectory *worker_directory,
- tmb::MessageBus *bus,
- const bool profile_individual_workorders = false)
- : foreman_client_id_(foreman_client_id),
- num_numa_nodes_(num_numa_nodes),
- catalog_database_(catalog_database),
- storage_manager_(storage_manager),
- worker_directory_(worker_directory),
- bus_(bus),
- profile_individual_workorders_(profile_individual_workorders) {}
-
- /**
- * @brief Destructor.
- **/
- ~PolicyEnforcer() {
- if (hasQueries()) {
- LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
- "waiting queries";
- }
- }
-
- /**
- * @brief Admit a query to the system.
- *
- * @param query_handle The QueryHandle for the new query.
- *
- * @return Whether the query was admitted to the system.
- **/
- bool admitQuery(QueryHandle *query_handle);
-
- /**
- * @brief Admit multiple queries in the system.
- *
- * @note In the current simple implementation, we only allow one active
- * query in the system. Other queries will have to wait.
- *
- * @param query_handles A vector of QueryHandles for the queries to be
- * admitted.
- *
- * @return True if all the queries were admitted, false if at least one query
- * was not admitted.
- **/
- bool admitQueries(const std::vector<QueryHandle*> &query_handles);
-
- /**
- * @brief Remove a given query that is under execution.
- *
- * @note This function is made public so that it is possible for a query
- * to be killed. Otherwise, it should only be used privately by the
- * class.
- *
- * TODO(harshad) - Extend this function to support removal of waiting queries.
- *
- * @param query_id The ID of the query to be removed.
- **/
- void removeQuery(const std::size_t query_id);
-
- /**
- * @brief Get worker messages to be dispatched. These worker messages come
- * from the active queries.
- *
- * @param worker_messages The worker messages to be dispatched.
- **/
- void getWorkerMessages(
- std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
-
- /**
- * @brief Process a message sent to the Foreman, which gets passed on to the
- * policy enforcer.
- *
- * @param message The message.
- **/
- void processMessage(const TaggedMessage &tagged_message);
-
- /**
- * @brief Check if there are any queries to be executed.
- *
- * @return True if there is at least one active or waiting query, false if
- * the policy enforcer doesn't have any query.
- **/
- inline bool hasQueries() const {
- return !(admitted_queries_.empty() && waiting_queries_.empty());
- }
-
- /**
- * @brief Get the profiling results for individual work order execution for a
- * given query.
- *
- * @note This function should only be called if profiling individual work
- * orders option is enabled.
- *
- * @param query_id The ID of the query for which the profiling results are
- * requested.
- *
- * @return A vector of tuples, each being a single profiling entry.
- **/
- inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
- getProfilingResults(const std::size_t query_id) const {
- DCHECK(profile_individual_workorders_);
- DCHECK(workorder_time_recorder_.find(query_id) !=
- workorder_time_recorder_.end());
- return workorder_time_recorder_.at(query_id);
- }
-
- private:
- static constexpr std::size_t kMaxConcurrentQueries = 1;
-
- /**
- * @brief Record the execution time for a finished WorkOrder.
- *
- * TODO(harshad) - Extend the functionality to rebuild work orders.
- *
- * @param proto The completion message proto sent after the WorkOrder
- * execution.
- **/
- void recordTimeForWorkOrder(
- const serialization::NormalWorkOrderCompletionMessage &proto);
-
- const tmb::client_id foreman_client_id_;
- const std::size_t num_numa_nodes_;
-
- CatalogDatabaseLite *catalog_database_;
- StorageManager *storage_manager_;
- WorkerDirectory *worker_directory_;
-
- tmb::MessageBus *bus_;
- const bool profile_individual_workorders_;
-
- // Key = query ID, value = QueryManagerBase* for the key query.
- std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
-
- // The queries which haven't been admitted yet.
- std::queue<QueryHandle*> waiting_queries_;
-
- // Key = Query ID.
- // Value = A tuple indicating a record of executing a work order.
- // Within a tuple ...
- // 1st element: Logical worker ID.
- // 2nd element: Operator ID.
- // 3rd element: Time in microseconds to execute the work order.
- std::unordered_map<
- std::size_t,
- std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
- workorder_time_recorder_;
-
- DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
new file mode 100644
index 0000000..d16a502
--- /dev/null
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -0,0 +1,177 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
+ std::size_t query_id;
+ QueryManagerBase::dag_node_index op_index;
+
+ switch (tagged_message.message_type()) {
+ case kWorkOrderCompleteMessage: {
+ serialization::NormalWorkOrderCompletionMessage proto;
+ // Note: This proto message contains the time it took to execute the
+ // WorkOrder. It can be accessed in this scope.
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ decrementNumQueuedWorkOrders(proto.worker_thread_index());
+
+ if (profile_individual_workorders_) {
+ recordTimeForWorkOrder(proto);
+ }
+
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
+ break;
+ }
+ case kRebuildWorkOrderCompleteMessage: {
+ serialization::RebuildWorkOrderCompletionMessage proto;
+ // Note: This proto message contains the time it took to execute the
+ // rebuild WorkOrder. It can be accessed in this scope.
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ decrementNumQueuedWorkOrders(proto.worker_thread_index());
+
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: {
+ serialization::CatalogRelationNewBlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+
+ const block_id block = proto.block_id();
+
+ CatalogRelation *relation =
+ static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
+ relation->addBlock(block);
+
+ if (proto.has_partition_id()) {
+ relation->getPartitionSchemeMutable()->addBlockToPartition(
+ proto.partition_id(), block);
+ }
+ return;
+ }
+ case kDataPipelineMessage: {
+ serialization::DataPipelineMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ admitted_queries_[query_id]->processDataPipelineMessage(
+ op_index, proto.block_id(), proto.relation_id());
+ break;
+ }
+ case kWorkOrdersAvailableMessage: {
+ serialization::WorkOrdersAvailableMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+
+ // Check if new work orders are available.
+ admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
+ break;
+ }
+ case kWorkOrderFeedbackMessage: {
+ WorkOrder::FeedbackMessage msg(
+ const_cast<void *>(tagged_message.message()),
+ tagged_message.message_bytes());
+ query_id = msg.header().query_id;
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = msg.header().rel_op_index;
+ admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
+ }
+ if (admitted_queries_[query_id]->queryStatus(op_index) ==
+ QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+ removeQuery(query_id);
+ if (!waiting_queries_.empty()) {
+ // Admit the earliest waiting query.
+ QueryHandle *new_query = waiting_queries_.front();
+ waiting_queries_.pop();
+ admitQuery(new_query);
+ }
+ }
+}
+
+void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+ LOG(WARNING) << "Removing query with ID " << query_id
+ << " that hasn't finished its execution";
+ }
+ admitted_queries_.erase(query_id);
+}
+
+bool PolicyEnforcerBase::admitQueries(
+ const std::vector<QueryHandle*> &query_handles) {
+ for (QueryHandle *curr_query : query_handles) {
+ if (!admitQuery(curr_query)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void PolicyEnforcerBase::recordTimeForWorkOrder(
+ const serialization::NormalWorkOrderCompletionMessage &proto) {
+ const std::size_t query_id = proto.query_id();
+ if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
+ workorder_time_recorder_[query_id];
+ }
+ workorder_time_recorder_[query_id].emplace_back(
+ proto.worker_thread_index(),
+ proto.operator_index(),
+ proto.execution_time_in_microseconds());
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
new file mode 100644
index 0000000..0482ebc
--- /dev/null
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -0,0 +1,196 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A base class that ensures that a high level policy is maintained
+ * in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param catalog_database The CatalogDatabase used.
+ * @param profile_individual_workorders If true, profile each normal work order.
+ **/
+ PolicyEnforcerBase(CatalogDatabaseLite *catalog_database,
+ const bool profile_individual_workorders)
+ : catalog_database_(catalog_database),
+ profile_individual_workorders_(profile_individual_workorders) {}
+
+ /**
+ * @brief Virtual Destructor.
+ **/
+ virtual ~PolicyEnforcerBase() {
+ if (hasQueries()) {
+ LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
+ "waiting queries";
+ }
+ }
+
+ /**
+ * @brief Admit multiple queries in the system.
+ *
+ * @note In the current simple implementation, we only allow one active
+ * query in the system. Other queries will have to wait.
+ *
+ * @param query_handles A vector of QueryHandles for the queries to be
+ * admitted.
+ *
+ * @return True if all the queries were admitted, false if at least one query
+ * was not admitted.
+ **/
+ bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+ /**
+ * @brief Remove a given query that is under execution.
+ *
+ * @note This function is made public so that it is possible for a query
+ * to be killed. Otherwise, it should only be used privately by the
+ * class.
+ *
+ * TODO(harshad) - Extend this function to support removal of waiting queries.
+ *
+ * @param query_id The ID of the query to be removed.
+ **/
+ void removeQuery(const std::size_t query_id);
+
+ /**
+ * @brief Process a message sent to the Foreman, which gets passed on to the
+ * policy enforcer.
+ *
+ * @param message The message.
+ **/
+ void processMessage(const TaggedMessage &tagged_message);
+
+ /**
+ * @brief Check if there are any queries to be executed.
+ *
+ * @return True if there is at least one active or waiting query, false if
+ * the policy enforcer doesn't have any query.
+ **/
+ inline bool hasQueries() const {
+ return !(admitted_queries_.empty() && waiting_queries_.empty());
+ }
+
+ /**
+ * @brief Get the profiling results for individual work order execution for a
+ * given query.
+ *
+ * @note This function should only be called if profiling individual work
+ * orders option is enabled.
+ *
+ * @param query_id The ID of the query for which the profiling results are
+ * requested.
+ *
+ * @return A vector of tuples, each being a single profiling entry.
+ **/
+ inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
+ getProfilingResults(const std::size_t query_id) const {
+ DCHECK(profile_individual_workorders_);
+ DCHECK(workorder_time_recorder_.find(query_id) !=
+ workorder_time_recorder_.end());
+ return workorder_time_recorder_.at(query_id);
+ }
+
+ protected:
+ static constexpr std::size_t kMaxConcurrentQueries = 1;
+
+ /**
+ * @brief Record the execution time for a finished WorkOrder.
+ *
+ * TODO(harshad) - Extend the functionality to rebuild work orders.
+ *
+ * @param proto The completion message proto sent after the WorkOrder
+ * execution.
+ **/
+ void recordTimeForWorkOrder(
+ const serialization::NormalWorkOrderCompletionMessage &proto);
+
+ CatalogDatabaseLite *catalog_database_;
+
+ const bool profile_individual_workorders_;
+
+ // Key = query ID, value = QueryManagerBase* for the key query.
+ std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
+
+ // The queries which haven't been admitted yet.
+ std::queue<QueryHandle*> waiting_queries_;
+
+ // Key = Query ID.
+ // Value = A tuple indicating a record of executing a work order.
+ // Within a tuple ...
+ // 1st element: Logical worker ID.
+ // 2nd element: Operator ID.
+ // 3rd element: Time in microseconds to execute the work order.
+ std::unordered_map<
+ std::size_t,
+ std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
+ workorder_time_recorder_;
+
+ private:
+ /**
+ * @brief Admit a query to the system.
+ *
+ * @param query_handle The QueryHandle for the new query.
+ *
+ * @return Whether the query was admitted to the system.
+ **/
+ virtual bool admitQuery(QueryHandle *query_handle) = 0;
+
+ /**
+ * @brief Decrement the number of queued workorders for the given worker by 1.
+ *
+ * @param worker_index The logical ID of the given worker.
+ **/
+ virtual void decrementNumQueuedWorkOrders(const std::size_t worker_index) = 0;
+
+ DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerBase);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.cpp b/query_execution/PolicyEnforcerSingleNode.cpp
new file mode 100644
index 0000000..549e39f
--- /dev/null
+++ b/query_execution/PolicyEnforcerSingleNode.cpp
@@ -0,0 +1,110 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerSingleNode.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerSingleNode.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+ " can be allocated in a single round of dispatch of messages to"
+ " the workers.");
+
+void PolicyEnforcerSingleNode::getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+ // Iterate over admitted queries until either there are no more
+ // messages available, or the maximum number of messages have
+ // been collected.
+ DCHECK(worker_messages->empty());
+ // TODO(harshad) - Make this function generic enough so that it
+ // works well when multiple queries are getting executed.
+ std::size_t per_query_share = 0;
+ if (!admitted_queries_.empty()) {
+ per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+ } else {
+ LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ return;
+ }
+ DCHECK_GT(per_query_share, 0u);
+ std::vector<std::size_t> finished_queries_ids;
+
+ for (const auto &admitted_query_info : admitted_queries_) {
+ QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+ DCHECK(curr_query_manager != nullptr);
+ std::size_t messages_collected_curr_query = 0;
+ while (messages_collected_curr_query < per_query_share) {
+ WorkerMessage *next_worker_message =
+ static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
+ if (next_worker_message != nullptr) {
+ ++messages_collected_curr_query;
+ worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+ } else {
+ // No more work ordes from the current query at this time.
+ // Check if the query's execution is over.
+ if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ // If the query has been executed, remove it.
+ finished_queries_ids.push_back(admitted_query_info.first);
+ }
+ break;
+ }
+ }
+ }
+ for (const std::size_t finished_qid : finished_queries_ids) {
+ removeQuery(finished_qid);
+ }
+}
+
+bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) {
+ if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+ // Query with the same ID not present, ok to admit.
+ admitted_queries_[query_id].reset(
+ new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
+ catalog_database_, storage_manager_, bus_));
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+ } else {
+ // This query will have to wait.
+ waiting_queries_.push(query_handle);
+ return false;
+ }
+}
+
+void PolicyEnforcerSingleNode::decrementNumQueuedWorkOrders(const std::size_t worker_index) {
+ worker_directory_->decrementNumQueuedWorkOrders(worker_index);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b78380e/query_execution/PolicyEnforcerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.hpp b/query_execution/PolicyEnforcerSingleNode.hpp
new file mode 100644
index 0000000..671fd83
--- /dev/null
+++ b/query_execution/PolicyEnforcerSingleNode.hpp
@@ -0,0 +1,104 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+class WorkerDirectory;
+class WorkerMessage;
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ * in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerSingleNode final : public PolicyEnforcerBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param foreman_client_id The TMB client ID of the Foreman.
+ * @param num_numa_nodes Number of NUMA nodes used by the system.
+ * @param catalog_database The CatalogDatabase used.
+ * @param storage_manager The StorageManager used.
+ * @param bus The TMB.
+ **/
+ PolicyEnforcerSingleNode(const tmb::client_id foreman_client_id,
+ const std::size_t num_numa_nodes,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ WorkerDirectory *worker_directory,
+ tmb::MessageBus *bus,
+ const bool profile_individual_workorders = false)
+ : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+ foreman_client_id_(foreman_client_id),
+ num_numa_nodes_(num_numa_nodes),
+ storage_manager_(storage_manager),
+ worker_directory_(worker_directory),
+ bus_(bus) {}
+
+ /**
+ * @brief Destructor.
+ **/
+ ~PolicyEnforcerSingleNode() override {}
+
+ bool admitQuery(QueryHandle *query_handle) override;
+
+ /**
+ * @brief Get worker messages to be dispatched. These worker messages come
+ * from the active queries.
+ *
+ * @param worker_messages The worker messages to be dispatched.
+ **/
+ void getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+ private:
+ void decrementNumQueuedWorkOrders(const std::size_t worker_index) override;
+
+ const tmb::client_id foreman_client_id_;
+ const std::size_t num_numa_nodes_;
+
+ StorageManager *storage_manager_;
+ WorkerDirectory *worker_directory_;
+
+ tmb::MessageBus *bus_;
+
+ DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerSingleNode);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_