You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/08 16:15:29 UTC
[1/2] incubator-quickstep git commit: Renamed Foreman related
classes. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/SQL-window-aggregation 9a5ecd288 -> 7671a5893 (forced update)
Renamed Foreman related classes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/303e51ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/303e51ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/303e51ee
Branch: refs/heads/SQL-window-aggregation
Commit: 303e51eeb507d0270c15013f8bb30da2cf6c8314
Parents: 04c8224
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Jul 7 14:13:19 2016 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Jul 7 14:18:46 2016 -0500
----------------------------------------------------------------------
CMakeLists.txt | 2 +-
cli/CMakeLists.txt | 1 -
cli/CommandExecutor.cpp | 1 -
cli/CommandExecutor.hpp | 1 -
cli/QuickstepCli.cpp | 21 +-
cli/tests/CMakeLists.txt | 2 +-
cli/tests/CommandExecutorTestRunner.cpp | 2 +-
cli/tests/CommandExecutorTestRunner.hpp | 15 +-
query_execution/CMakeLists.txt | 28 +-
query_execution/Foreman.cpp | 255 ------------------
query_execution/Foreman.hpp | 140 ----------
query_execution/ForemanBase.hpp | 85 ++++++
query_execution/ForemanLite.hpp | 85 ------
query_execution/ForemanSingleNode.cpp | 256 +++++++++++++++++++
query_execution/ForemanSingleNode.hpp | 140 ++++++++++
query_optimizer/tests/CMakeLists.txt | 2 +-
.../tests/ExecutionGeneratorTestRunner.cpp | 2 +-
.../tests/ExecutionGeneratorTestRunner.hpp | 15 +-
18 files changed, 527 insertions(+), 526 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 20e1fb9..0e5d3de 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -748,7 +748,7 @@ target_link_libraries(quickstep_cli_shell
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_Foreman
+ quickstep_queryexecution_ForemanSingleNode
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 44ec223..9637055 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -89,7 +89,6 @@ target_link_libraries(quickstep_cli_CommandExecutor
quickstep_cli_PrintToScreen
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
- quickstep_queryexecution_Foreman
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_QueryPlan
quickstep_queryoptimizer_QueryProcessor
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 7083ef5..8acfae8 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -34,7 +34,6 @@
#include "parser/ParseStatement.hpp"
#include "parser/ParseString.hpp"
#include "parser/SqlParserWrapper.hpp"
-#include "query_execution/Foreman.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "query_optimizer/QueryProcessor.hpp"
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/cli/CommandExecutor.hpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.hpp b/cli/CommandExecutor.hpp
index 3435aeb..19d03e6 100644
--- a/cli/CommandExecutor.hpp
+++ b/cli/CommandExecutor.hpp
@@ -32,7 +32,6 @@ namespace tmb { class MessageBus; }
namespace quickstep {
class CatalogDatabase;
-class Foreman;
class ParseStatement;
class QueryProcessor;
class StorageManager;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 02a55a0..68a3599 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -58,7 +58,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
#include "parser/ParseStatement.hpp"
#include "parser/SqlParserWrapper.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/Worker.hpp"
@@ -104,7 +104,7 @@ using quickstep::AdmitRequestMessage;
using quickstep::CatalogRelation;
using quickstep::DefaultsConfigurator;
using quickstep::DropRelation;
-using quickstep::Foreman;
+using quickstep::ForemanSingleNode;
using quickstep::InputParserUtil;
using quickstep::MessageBusImpl;
using quickstep::MessageStyle;
@@ -353,14 +353,15 @@ int main(int argc, char* argv[]) {
worker_client_ids,
worker_numa_nodes);
- Foreman foreman(main_thread_client_id,
- &worker_directory,
- &bus,
- query_processor->getDefaultDatabase(),
- query_processor->getStorageManager(),
- -1, // Don't pin the Foreman thread.
- num_numa_nodes_system,
- quickstep::FLAGS_profile_and_report_workorder_perf);
+ ForemanSingleNode foreman(
+ main_thread_client_id,
+ &worker_directory,
+ &bus,
+ query_processor->getDefaultDatabase(),
+ query_processor->getStorageManager(),
+ -1, // Don't pin the Foreman thread.
+ num_numa_nodes_system,
+ quickstep::FLAGS_profile_and_report_workorder_perf);
// Start the worker threads.
for (Worker &worker : workers) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/cli/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt
index d177d6c..7da56d1 100644
--- a/cli/tests/CMakeLists.txt
+++ b/cli/tests/CMakeLists.txt
@@ -33,7 +33,7 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_Foreman
+ quickstep_queryexecution_ForemanSingleNode
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp
index 794f7e1..bd7082f 100644
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@ -27,7 +27,7 @@
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/Worker.hpp"
#include "query_optimizer/ExecutionGenerator.hpp"
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/cli/tests/CommandExecutorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.hpp b/cli/tests/CommandExecutorTestRunner.hpp
index 8fb5b65..69692ae 100644
--- a/cli/tests/CommandExecutorTestRunner.hpp
+++ b/cli/tests/CommandExecutorTestRunner.hpp
@@ -25,7 +25,7 @@
#include <vector>
#include "parser/SqlParserWrapper.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/Worker.hpp"
@@ -77,11 +77,12 @@ class CommandExecutorTestRunner : public TextBasedTestRunner {
workers_.reset(new WorkerDirectory(1 /* number of workers */,
worker_client_ids, numa_nodes));
- foreman_.reset(new Foreman(main_thread_client_id_,
- workers_.get(),
- &bus_,
- test_database_loader_.catalog_database(),
- test_database_loader_.storage_manager()));
+ foreman_.reset(
+ new ForemanSingleNode(main_thread_client_id_,
+ workers_.get(),
+ &bus_,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager()));
foreman_->start();
worker_->start();
@@ -104,7 +105,7 @@ class CommandExecutorTestRunner : public TextBasedTestRunner {
tmb::client_id main_thread_client_id_;
MessageBusImpl bus_;
- std::unique_ptr<Foreman> foreman_;
+ std::unique_ptr<ForemanSingleNode> foreman_;
std::unique_ptr<Worker> worker_;
std::unique_ptr<WorkerDirectory> workers_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b031a44..2be451c 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -1,6 +1,6 @@
# Copyright 2011-2015 Quickstep Technologies LLC.
# Copyright 2015-2016 Pivotal Software, Inc.
-# Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+# Copyright 2016, Quickstep Research Group, Computer Sciences Department,
# University of Wisconsin\u2014Madison.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -32,8 +32,8 @@ if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
endif()
add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
-add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
-add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
+add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
@@ -69,11 +69,15 @@ if (ENABLE_DISTRIBUTED)
quickstep_utility_Macros
tmb)
endif()
-target_link_libraries(quickstep_queryexecution_Foreman
- ${GFLAGS_LIB_NAME}
+target_link_libraries(quickstep_queryexecution_ForemanBase
+ glog
+ quickstep_threading_Thread
+ quickstep_utility_Macros
+ tmb)
+target_link_libraries(quickstep_queryexecution_ForemanSingleNode
glog
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_ForemanLite
+ quickstep_queryexecution_ForemanBase
quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
@@ -82,12 +86,8 @@ target_link_libraries(quickstep_queryexecution_Foreman
quickstep_threading_ThreadUtil
quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
- tmb)
-target_link_libraries(quickstep_queryexecution_ForemanLite
- glog
- quickstep_threading_Thread
- quickstep_utility_Macros
- tmb)
+ tmb
+ ${GFLAGS_LIB_NAME})
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
${GFLAGS_LIB_NAME}
glog
@@ -199,8 +199,8 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_Foreman
- quickstep_queryexecution_ForemanLite
+ quickstep_queryexecution_ForemanBase
+ quickstep_queryexecution_ForemanSingleNode
quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
deleted file mode 100644
index 98146e2..0000000
--- a/query_execution/Foreman.cpp
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Copyright 2011-2015 Quickstep Technologies LLC.
- * Copyright 2015-2016 Pivotal Software, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#include "query_execution/Foreman.hpp"
-
-#include <cstddef>
-#include <cstdio>
-#include <memory>
-#include <tuple>
-#include <utility>
-#include <vector>
-
-#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "threading/ThreadUtil.hpp"
-#include "utility/EqualsAnyConstant.hpp"
-#include "utility/Macros.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::size_t;
-using std::unique_ptr;
-using std::vector;
-
-namespace quickstep {
-
-DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
- "of pending work orders for the worker. This information is used "
- "by the Foreman to assign work orders to worker threads");
-
-Foreman::Foreman(const tmb::client_id main_thread_client_id,
- WorkerDirectory *worker_directory,
- tmb::MessageBus *bus,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- const int cpu_id,
- const size_t num_numa_nodes,
- const bool profile_individual_workorders)
- : ForemanLite(bus, cpu_id),
- main_thread_client_id_(main_thread_client_id),
- worker_directory_(DCHECK_NOTNULL(worker_directory)),
- catalog_database_(DCHECK_NOTNULL(catalog_database)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {
- const std::vector<QueryExecutionMessageType> sender_message_types{
- kPoisonMessage,
- kRebuildWorkOrderMessage,
- kWorkOrderMessage,
- kWorkloadCompletionMessage};
-
- for (const auto message_type : sender_message_types) {
- bus_->RegisterClientAsSender(foreman_client_id_, message_type);
- }
-
- const std::vector<QueryExecutionMessageType> receiver_message_types{
- kAdmitRequestMessage,
- kCatalogRelationNewBlockMessage,
- kDataPipelineMessage,
- kPoisonMessage,
- kRebuildWorkOrderCompleteMessage,
- kWorkOrderFeedbackMessage,
- kWorkOrdersAvailableMessage,
- kWorkOrderCompleteMessage};
-
- for (const auto message_type : receiver_message_types) {
- bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
- }
-
- policy_enforcer_.reset(new PolicyEnforcer(
- foreman_client_id_,
- num_numa_nodes,
- catalog_database_,
- storage_manager_,
- worker_directory_,
- bus_,
- profile_individual_workorders));
-}
-
-void Foreman::run() {
- if (cpu_id_ >= 0) {
- // We can pin the foreman thread to a CPU if specified.
- ThreadUtil::BindToCPU(cpu_id_);
- }
-
- // Event loop
- for (;;) {
- // Receive() causes this thread to sleep until next message is received.
- const AnnotatedMessage annotated_msg =
- bus_->Receive(foreman_client_id_, 0, true);
- const TaggedMessage &tagged_message = annotated_msg.tagged_message;
- const tmb::message_type_id message_type = tagged_message.message_type();
- switch (message_type) {
- case kCatalogRelationNewBlockMessage: // Fall through
- case kDataPipelineMessage:
- case kRebuildWorkOrderCompleteMessage:
- case kWorkOrderCompleteMessage:
- case kWorkOrderFeedbackMessage:
- case kWorkOrdersAvailableMessage: {
- policy_enforcer_->processMessage(tagged_message);
- break;
- }
-
- case kAdmitRequestMessage: {
- const AdmitRequestMessage *msg =
- static_cast<const AdmitRequestMessage *>(tagged_message.message());
- const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
-
- DCHECK(!query_handles.empty());
- bool all_queries_admitted = true;
- if (query_handles.size() == 1u) {
- all_queries_admitted =
- policy_enforcer_->admitQuery(query_handles.front());
- } else {
- all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
- }
- if (!all_queries_admitted) {
- LOG(WARNING) << "The scheduler could not admit all the queries";
- // TODO(harshad) - Inform the main thread about the failure.
- }
- break;
- }
- case kPoisonMessage: {
- if (policy_enforcer_->hasQueries()) {
- LOG(WARNING) << "Foreman thread exiting while some queries are "
- "under execution or waiting to be admitted";
- }
- return;
- }
- default:
- LOG(FATAL) << "Unknown message type to Foreman";
- }
-
- if (canCollectNewMessages(message_type)) {
- vector<unique_ptr<WorkerMessage>> new_messages;
- policy_enforcer_->getWorkerMessages(&new_messages);
- dispatchWorkerMessages(new_messages);
- }
-
- // We check again, as some queries may produce zero work orders and finish
- // their execution.
- if (!policy_enforcer_->hasQueries()) {
- // Signal the main thread that there are no queries to be executed.
- // Currently the message doesn't have any real content.
- const int dummy_payload = 0;
- TaggedMessage completion_tagged_message(
- &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
- const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(
- bus_,
- foreman_client_id_,
- main_thread_client_id_,
- move(completion_tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "Message could not be sent from Foreman with TMB client ID "
- << foreman_client_id_ << " to main thread with TMB client ID"
- << main_thread_client_id_;
- }
- }
-}
-
-bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) {
- if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
- kCatalogRelationNewBlockMessage,
- kWorkOrderFeedbackMessage)) {
- return false;
- } else if (worker_directory_->getLeastLoadedWorker().second <=
- FLAGS_min_load_per_worker) {
- // If the least loaded worker has only one pending work order, we should
- // collect new messages and dispatch them.
- return true;
- } else {
- return false;
- }
-}
-
-void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
- for (const auto &message : messages) {
- DCHECK(message != nullptr);
- const int recipient_worker_thread_index = message->getRecipientHint();
- if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
- sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
- *message);
- worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
- } else {
- const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
- sendWorkerMessage(least_loaded_worker_thread_index, *message);
- worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
- }
- }
-}
-
-void Foreman::sendWorkerMessage(const size_t worker_thread_index,
- const WorkerMessage &message) {
- tmb::message_type_id type;
- if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) {
- type = kRebuildWorkOrderMessage;
- } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) {
- type = kWorkOrderMessage;
- } else {
- FATAL_ERROR("Invalid WorkerMessageType");
- }
- TaggedMessage worker_tagged_message(&message, sizeof(message), type);
-
- const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- worker_directory_->getClientID(worker_thread_index),
- move(worker_tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
- "Message could not be sent from Foreman with TMB client ID "
- << foreman_client_id_ << " to Foreman with TMB client ID "
- << worker_directory_->getClientID(worker_thread_index);
-}
-
-void Foreman::printWorkOrderProfilingResults(const std::size_t query_id,
- std::FILE *out) const {
- const std::vector<
- std::tuple<std::size_t, std::size_t, std::size_t>>
- &recorded_times = policy_enforcer_->getProfilingResults(query_id);
- fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);
- for (auto workorder_entry : recorded_times) {
- // Note: Index of the "worker thread index" in the tuple is 0.
- const std::size_t worker_id = std::get<0>(workorder_entry);
- fprintf(out,
- "%lu,%lu,%d,%lu,%lu\n",
- query_id,
- worker_id,
- worker_directory_->getNUMANode(worker_id),
- std::get<1>(workorder_entry), // Operator ID.
- std::get<2>(workorder_entry)); // Time.
- }
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
deleted file mode 100644
index 7be57e7..0000000
--- a/query_execution/Foreman.hpp
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Copyright 2011-2015 Quickstep Technologies LLC.
- * Copyright 2015-2016 Pivotal Software, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
-
-#include <cstddef>
-#include <cstdio>
-#include <memory>
-#include <vector>
-
-#include "query_execution/ForemanLite.hpp"
-#include "query_execution/PolicyEnforcer.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class StorageManager;
-class WorkerDirectory;
-class WorkerMessage;
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief The Foreman receives queries from the main thread, messages from the
- * policy enforcer and dispatches the work to worker threads. It also
- * receives work completion messages from workers.
- **/
-class Foreman final : public ForemanLite {
- public:
- /**
- * @brief Constructor.
- *
- * @param main_thread_client_id The TMB client ID of the main thread.
- * @param worker_directory The worker directory.
- * @param bus A pointer to the TMB.
- * @param catalog_database The catalog database where this query is executed.
- * @param storage_manager The StorageManager to use.
- * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
- * @param num_numa_nodes The number of NUMA nodes in the system.
- * @param profile_individual_workorders Whether every workorder's execution
- * be profiled or not.
- *
- * @note If cpu_id is not specified, Foreman thread can be possibly moved
- * around on different CPUs by the OS.
- **/
- Foreman(const tmb::client_id main_thread_client_id,
- WorkerDirectory *worker_directory,
- tmb::MessageBus *bus,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- const int cpu_id = -1,
- const std::size_t num_numa_nodes = 1,
- const bool profile_individual_workorders = false);
-
- ~Foreman() override {}
-
- /**
- * @brief Print the results of profiling individual work orders for a given
- * query.
- *
- * TODO(harshad) - Add the name of the operator to the output.
- * TODO(harshad) - Add the CPU core ID of the operator to the output. This
- * will require modifying the WorkerDirectory to remember worker affinities.
- * Until then, the users can refer to the worker_affinities provided to the
- * cli to infer the CPU core ID where a given worker is pinned.
- *
- * @param query_id The ID of the query for which the results are to be printed.
- * @param out The file stream.
- **/
- void printWorkOrderProfilingResults(const std::size_t query_id,
- std::FILE *out) const;
-
- protected:
- void run() override;
-
- private:
- /**
- * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the
- * worker threads.
- *
- * @param messages The messages to be dispatched.
- **/
- void dispatchWorkerMessages(
- const std::vector<std::unique_ptr<WorkerMessage>> &messages);
-
- /**
- * @brief Send the given message to the specified worker.
- *
- * @param worker_thread_index The logical index of the recipient worker thread
- * in WorkerDirectory.
- * @param message The WorkerMessage to be sent.
- **/
- void sendWorkerMessage(const std::size_t worker_thread_index,
- const WorkerMessage &message);
-
- /**
- * @brief Check if we can collect new messages from the PolicyEnforcer.
- *
- * @param message_type The type of the last received message.
- **/
- bool canCollectNewMessages(const tmb::message_type_id message_type);
-
- const tmb::client_id main_thread_client_id_;
-
- WorkerDirectory *worker_directory_;
-
- CatalogDatabaseLite *catalog_database_;
- StorageManager *storage_manager_;
-
- std::unique_ptr<PolicyEnforcer> policy_enforcer_;
-
- DISALLOW_COPY_AND_ASSIGN(Foreman);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_execution/ForemanBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanBase.hpp b/query_execution/ForemanBase.hpp
new file mode 100644
index 0000000..274b8fc
--- /dev/null
+++ b/query_execution/ForemanBase.hpp
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
+
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A base class that Foreman implements. This class is used to derive
+ * for implementations for both the single-node and distributed versions.
+ **/
+class ForemanBase : public Thread {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param bus A pointer to the TMB.
+ * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+ *
+ * @note If cpu_id is not specified, Foreman thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ ForemanBase(tmb::MessageBus *bus,
+ const int cpu_id)
+ : bus_(DCHECK_NOTNULL(bus)),
+ cpu_id_(cpu_id) {
+ foreman_client_id_ = bus_->Connect();
+ }
+
+ ~ForemanBase() override {}
+
+ /**
+ * @brief Get the TMB client ID of Foreman thread.
+ *
+ * @return TMB client ID of foreman thread.
+ **/
+ tmb::client_id getBusClientID() const {
+ return foreman_client_id_;
+ }
+
+ protected:
+ void run() override = 0;
+
+ tmb::MessageBus *bus_;
+
+ tmb::client_id foreman_client_id_;
+
+ // The ID of the CPU that the Foreman thread can optionally be pinned to.
+ const int cpu_id_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ForemanBase);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_execution/ForemanLite.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanLite.hpp b/query_execution/ForemanLite.hpp
deleted file mode 100644
index cb6cdf3..0000000
--- a/query_execution/ForemanLite.hpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Copyright 2016 Pivotal Software, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_
-
-#include "threading/Thread.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A base class that Foreman implements. This class is used to derive
- * for implementations for both the single-node and distributed versions.
- **/
-class ForemanLite : public Thread {
- public:
- /**
- * @brief Constructor.
- *
- * @param bus A pointer to the TMB.
- * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
- *
- * @note If cpu_id is not specified, Foreman thread can be possibly moved
- * around on different CPUs by the OS.
- **/
- ForemanLite(tmb::MessageBus *bus,
- const int cpu_id)
- : bus_(DCHECK_NOTNULL(bus)),
- cpu_id_(cpu_id) {
- foreman_client_id_ = bus_->Connect();
- }
-
- ~ForemanLite() override {}
-
- /**
- * @brief Get the TMB client ID of Foreman thread.
- *
- * @return TMB client ID of foreman thread.
- **/
- tmb::client_id getBusClientID() const {
- return foreman_client_id_;
- }
-
- protected:
- void run() override = 0;
-
- tmb::MessageBus *bus_;
-
- tmb::client_id foreman_client_id_;
-
- // The ID of the CPU that the Foreman thread can optionally be pinned to.
- const int cpu_id_;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(ForemanLite);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
new file mode 100644
index 0000000..3aa1f0b
--- /dev/null
+++ b/query_execution/ForemanSingleNode.cpp
@@ -0,0 +1,256 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/ForemanSingleNode.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+namespace quickstep {
+
+DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
+ "of pending work orders for the worker. This information is used "
+ "by the Foreman to assign work orders to worker threads");
+
+ForemanSingleNode::ForemanSingleNode(
+ const tmb::client_id main_thread_client_id,
+ WorkerDirectory *worker_directory,
+ tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ const int cpu_id,
+ const size_t num_numa_nodes,
+ const bool profile_individual_workorders)
+ : ForemanBase(bus, cpu_id),
+ main_thread_client_id_(main_thread_client_id),
+ worker_directory_(DCHECK_NOTNULL(worker_directory)),
+ catalog_database_(DCHECK_NOTNULL(catalog_database)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ const std::vector<QueryExecutionMessageType> sender_message_types{
+ kPoisonMessage,
+ kRebuildWorkOrderMessage,
+ kWorkOrderMessage,
+ kWorkloadCompletionMessage};
+
+ for (const auto message_type : sender_message_types) {
+ bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+ }
+
+ const std::vector<QueryExecutionMessageType> receiver_message_types{
+ kAdmitRequestMessage,
+ kCatalogRelationNewBlockMessage,
+ kDataPipelineMessage,
+ kPoisonMessage,
+ kRebuildWorkOrderCompleteMessage,
+ kWorkOrderFeedbackMessage,
+ kWorkOrdersAvailableMessage,
+ kWorkOrderCompleteMessage};
+
+ for (const auto message_type : receiver_message_types) {
+ bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+ }
+
+ policy_enforcer_.reset(new PolicyEnforcer(
+ foreman_client_id_,
+ num_numa_nodes,
+ catalog_database_,
+ storage_manager_,
+ worker_directory_,
+ bus_,
+ profile_individual_workorders));
+}
+
+void ForemanSingleNode::run() {
+ if (cpu_id_ >= 0) {
+ // We can pin the foreman thread to a CPU if specified.
+ ThreadUtil::BindToCPU(cpu_id_);
+ }
+
+ // Event loop
+ for (;;) {
+ // Receive() causes this thread to sleep until next message is received.
+ const AnnotatedMessage annotated_msg =
+ bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+ const tmb::message_type_id message_type = tagged_message.message_type();
+ switch (message_type) {
+ case kCatalogRelationNewBlockMessage: // Fall through
+ case kDataPipelineMessage:
+ case kRebuildWorkOrderCompleteMessage:
+ case kWorkOrderCompleteMessage:
+ case kWorkOrderFeedbackMessage:
+ case kWorkOrdersAvailableMessage: {
+ policy_enforcer_->processMessage(tagged_message);
+ break;
+ }
+
+ case kAdmitRequestMessage: {
+ const AdmitRequestMessage *msg =
+ static_cast<const AdmitRequestMessage *>(tagged_message.message());
+ const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
+
+ DCHECK(!query_handles.empty());
+ bool all_queries_admitted = true;
+ if (query_handles.size() == 1u) {
+ all_queries_admitted =
+ policy_enforcer_->admitQuery(query_handles.front());
+ } else {
+ all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+ }
+ if (!all_queries_admitted) {
+ LOG(WARNING) << "The scheduler could not admit all the queries";
+ // TODO(harshad) - Inform the main thread about the failure.
+ }
+ break;
+ }
+ case kPoisonMessage: {
+ if (policy_enforcer_->hasQueries()) {
+ LOG(WARNING) << "Foreman thread exiting while some queries are "
+ "under execution or waiting to be admitted";
+ }
+ return;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type to Foreman";
+ }
+
+ if (canCollectNewMessages(message_type)) {
+ vector<unique_ptr<WorkerMessage>> new_messages;
+ policy_enforcer_->getWorkerMessages(&new_messages);
+ dispatchWorkerMessages(new_messages);
+ }
+
+ // We check again, as some queries may produce zero work orders and finish
+ // their execution.
+ if (!policy_enforcer_->hasQueries()) {
+ // Signal the main thread that there are no queries to be executed.
+ // Currently the message doesn't have any real content.
+ const int dummy_payload = 0;
+ TaggedMessage completion_tagged_message(
+ &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(
+ bus_,
+ foreman_client_id_,
+ main_thread_client_id_,
+ move(completion_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID "
+ << foreman_client_id_ << " to main thread with TMB client ID"
+ << main_thread_client_id_;
+ }
+ }
+}
+
+bool ForemanSingleNode::canCollectNewMessages(const tmb::message_type_id message_type) {
+ if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+ kCatalogRelationNewBlockMessage,
+ kWorkOrderFeedbackMessage)) {
+ return false;
+ } else if (worker_directory_->getLeastLoadedWorker().second <=
+ FLAGS_min_load_per_worker) {
+ // If the least loaded worker has only one pending work order, we should
+ // collect new messages and dispatch them.
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void ForemanSingleNode::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
+ for (const auto &message : messages) {
+ DCHECK(message != nullptr);
+ const int recipient_worker_thread_index = message->getRecipientHint();
+ if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
+ sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
+ *message);
+ worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
+ } else {
+ const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
+ sendWorkerMessage(least_loaded_worker_thread_index, *message);
+ worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
+ }
+ }
+}
+
+void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
+ const WorkerMessage &message) {
+ tmb::message_type_id type;
+ if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) {
+ type = kRebuildWorkOrderMessage;
+ } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) {
+ type = kWorkOrderMessage;
+ } else {
+ FATAL_ERROR("Invalid WorkerMessageType");
+ }
+ TaggedMessage worker_tagged_message(&message, sizeof(message), type);
+
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ worker_directory_->getClientID(worker_thread_index),
+ move(worker_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
+ "Message could not be sent from Foreman with TMB client ID "
+ << foreman_client_id_ << " to Foreman with TMB client ID "
+ << worker_directory_->getClientID(worker_thread_index);
+}
+
+void ForemanSingleNode::printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const {
+ const std::vector<
+ std::tuple<std::size_t, std::size_t, std::size_t>>
+ &recorded_times = policy_enforcer_->getProfilingResults(query_id);
+ fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);
+ for (auto workorder_entry : recorded_times) {
+ // Note: Index of the "worker thread index" in the tuple is 0.
+ const std::size_t worker_id = std::get<0>(workorder_entry);
+ fprintf(out,
+ "%lu,%lu,%d,%lu,%lu\n",
+ query_id,
+ worker_id,
+ worker_directory_->getNUMANode(worker_id),
+ std::get<1>(workorder_entry), // Operator ID.
+ std::get<2>(workorder_entry)); // Time.
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
new file mode 100644
index 0000000..7506d35
--- /dev/null
+++ b/query_execution/ForemanSingleNode.hpp
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "query_execution/ForemanBase.hpp"
+#include "query_execution/PolicyEnforcer.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class StorageManager;
+class WorkerDirectory;
+class WorkerMessage;
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief The Foreman receives queries from the main thread, messages from the
+ * policy enforcer and dispatches the work to worker threads. It also
+ * receives work completion messages from workers.
+ **/
+class ForemanSingleNode final : public ForemanBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param main_thread_client_id The TMB client ID of the main thread.
+ * @param worker_directory The worker directory.
+ * @param bus A pointer to the TMB.
+ * @param catalog_database The catalog database where this query is executed.
+ * @param storage_manager The StorageManager to use.
+ * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+ * @param num_numa_nodes The number of NUMA nodes in the system.
+ * @param profile_individual_workorders Whether every workorder's execution
+ * be profiled or not.
+ *
+ * @note If cpu_id is not specified, Foreman thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ ForemanSingleNode(const tmb::client_id main_thread_client_id,
+ WorkerDirectory *worker_directory,
+ tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ const int cpu_id = -1,
+ const std::size_t num_numa_nodes = 1,
+ const bool profile_individual_workorders = false);
+
+ ~ForemanSingleNode() override {}
+
+ /**
+ * @brief Print the results of profiling individual work orders for a given
+ * query.
+ *
+ * TODO(harshad) - Add the name of the operator to the output.
+ * TODO(harshad) - Add the CPU core ID of the operator to the output. This
+ * will require modifying the WorkerDirectory to remember worker affinities.
+ * Until then, the users can refer to the worker_affinities provided to the
+ * cli to infer the CPU core ID where a given worker is pinned.
+ *
+ * @param query_id The ID of the query for which the results are to be printed.
+ * @param out The file stream.
+ **/
+ void printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const;
+
+ protected:
+ void run() override;
+
+ private:
+ /**
+ * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the
+ * worker threads.
+ *
+ * @param messages The messages to be dispatched.
+ **/
+ void dispatchWorkerMessages(
+ const std::vector<std::unique_ptr<WorkerMessage>> &messages);
+
+ /**
+ * @brief Send the given message to the specified worker.
+ *
+ * @param worker_thread_index The logical index of the recipient worker thread
+ * in WorkerDirectory.
+ * @param message The WorkerMessage to be sent.
+ **/
+ void sendWorkerMessage(const std::size_t worker_thread_index,
+ const WorkerMessage &message);
+
+ /**
+ * @brief Check if we can collect new messages from the PolicyEnforcer.
+ *
+ * @param message_type The type of the last received message.
+ **/
+ bool canCollectNewMessages(const tmb::message_type_id message_type);
+
+ const tmb::client_id main_thread_client_id_;
+
+ WorkerDirectory *worker_directory_;
+
+ CatalogDatabaseLite *catalog_database_;
+ StorageManager *storage_manager_;
+
+ std::unique_ptr<PolicyEnforcer> policy_enforcer_;
+
+ DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 5b58f75..9cad47f 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -117,7 +117,7 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_Foreman
+ quickstep_queryexecution_ForemanSingleNode
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 8c1d306..563a777 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -25,7 +25,7 @@
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/Worker.hpp"
#include "query_optimizer/ExecutionGenerator.hpp"
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/303e51ee/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
index bb2a26f..d1d9380 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
@@ -25,7 +25,7 @@
#include <vector>
#include "parser/SqlParserWrapper.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/Worker.hpp"
#include "query_execution/WorkerDirectory.hpp"
@@ -80,11 +80,12 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
workers_.reset(new WorkerDirectory(1 /* number of workers */,
worker_client_ids, numa_nodes));
- foreman_.reset(new Foreman(main_thread_client_id_,
- workers_.get(),
- &bus_,
- test_database_loader_.catalog_database(),
- test_database_loader_.storage_manager()));
+ foreman_.reset(
+ new ForemanSingleNode(main_thread_client_id_,
+ workers_.get(),
+ &bus_,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager()));
foreman_->start();
worker_->start();
@@ -105,7 +106,7 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
TestDatabaseLoader test_database_loader_;
MessageBusImpl bus_;
- std::unique_ptr<Foreman> foreman_;
+ std::unique_ptr<ForemanSingleNode> foreman_;
std::unique_ptr<Worker> worker_;
std::unique_ptr<WorkerDirectory> workers_;
[2/2] incubator-quickstep git commit: Added ExecutionGenerator
support for Window Aggregation.
Posted by zu...@apache.org.
Added ExecutionGenerator support for Window Aggregation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7671a589
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7671a589
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7671a589
Branch: refs/heads/SQL-window-aggregation
Commit: 7671a5893349f060d95e20cd8b98ad076c614cec
Parents: 303e51e
Author: shixuan-fan <sh...@apache.org>
Authored: Wed Jun 29 20:25:18 2016 +0000
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Jul 8 11:15:20 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 2 +
query_execution/QueryContext.cpp | 14 ++
query_execution/QueryContext.hpp | 48 +++++
query_execution/QueryContext.proto | 5 +-
query_optimizer/CMakeLists.txt | 2 +
query_optimizer/ExecutionGenerator.cpp | 101 ++++++++++-
query_optimizer/ExecutionGenerator.hpp | 8 +
query_optimizer/cost_model/CMakeLists.txt | 2 +
query_optimizer/cost_model/SimpleCostModel.cpp | 9 +
query_optimizer/cost_model/SimpleCostModel.hpp | 5 +
.../cost_model/StarSchemaSimpleCostModel.cpp | 8 +
.../cost_model/StarSchemaSimpleCostModel.hpp | 4 +
query_optimizer/resolver/Resolver.cpp | 7 +-
.../tests/execution_generator/Select.test | 39 ++++
query_optimizer/tests/resolver/Select.test | 33 ++++
relational_operators/CMakeLists.txt | 14 ++
.../WindowAggregationOperator.cpp | 82 +++++++++
.../WindowAggregationOperator.hpp | 166 +++++++++++++++++
relational_operators/WorkOrder.proto | 9 +
storage/CMakeLists.txt | 45 ++++-
storage/WindowAggregationOperationState.cpp | 179 +++++++++++++++++++
storage/WindowAggregationOperationState.hpp | 177 ++++++++++++++++++
storage/WindowAggregationOperationState.proto | 33 ++++
...WindowAggregationOperationState_unittest.cpp | 92 ++++++++++
24 files changed, 1079 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 2be451c..e1b1183 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -118,6 +118,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext
quickstep_storage_HashTableFactory
quickstep_storage_InsertDestination
quickstep_storage_InsertDestination_proto
+ quickstep_storage_WindowAggregationOperationState
quickstep_types_TypedValue
quickstep_types_containers_Tuple
quickstep_utility_BloomFilter
@@ -130,6 +131,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto
quickstep_storage_AggregationOperationState_proto
quickstep_storage_HashTable_proto
quickstep_storage_InsertDestination_proto
+ quickstep_storage_WindowAggregationOperationState_proto
quickstep_types_containers_Tuple_proto
quickstep_utility_SortConfiguration_proto
${PROTOBUF_LIBRARY})
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 54dd557..7019b6a 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -140,6 +140,13 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
update_groups_.push_back(move(update_group));
}
+
+ for (int i = 0; i < proto.window_aggregation_states_size(); ++i) {
+ window_aggregation_states_.emplace_back(
+ WindowAggregationOperationState::ReconstructFromProto(proto.window_aggregation_states(i),
+ database,
+ storage_manager));
+ }
}
bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
@@ -231,6 +238,13 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
}
}
+ for (int i = 0; i < proto.window_aggregation_states_size(); ++i) {
+ if (!WindowAggregationOperationState::ProtoIsValid(proto.window_aggregation_states(i),
+ database)) {
+ return false;
+ }
+ }
+
return proto.IsInitialized();
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7d5628d..9171250 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -33,6 +33,7 @@
#include "storage/AggregationOperationState.hpp"
#include "storage/HashTable.hpp"
#include "storage/InsertDestination.hpp"
+#include "storage/WindowAggregationOperationState.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/BloomFilter.hpp"
#include "utility/Macros.hpp"
@@ -120,6 +121,11 @@ class QueryContext {
typedef std::uint32_t update_group_id;
/**
+ * @brief A unique identifier for a window aggregation state.
+ **/
+ typedef std::uint32_t window_aggregation_state_id;
+
+ /**
* @brief Constructor.
*
* @param proto A serialized Protocol Buffer representation of a
@@ -460,6 +466,47 @@ class QueryContext {
return update_groups_[id];
}
+ /**
+ * @brief Whether the given WindowAggregationOperationState id is valid.
+ *
+ * @param id The WindowAggregationOperationState id.
+ *
+ * @return True if valid, otherwise false.
+ **/
+ bool isValidWindowAggregationStateId(const window_aggregation_state_id id) const {
+ return id < window_aggregation_states_.size();
+ }
+
+ /**
+ * @brief Get the WindowAggregationOperationState.
+ *
+ * @param id The WindowAggregationOperationState id in the query.
+ *
+ * @return The WindowAggregationOperationState, already created in the
+ * constructor.
+ **/
+ inline WindowAggregationOperationState* getWindowAggregationState(
+ const window_aggregation_state_id id) {
+ DCHECK_LT(id, window_aggregation_states_.size());
+ DCHECK(window_aggregation_states_[id]);
+ return window_aggregation_states_[id].get();
+ }
+
+ /**
+ * @brief Release the given WindowAggregationOperationState.
+ *
+ * @param id The id of the WindowAggregationOperationState to destroy.
+ *
+ * @return The WindowAggregationOperationState, already created in the
+ * constructor.
+ **/
+ inline WindowAggregationOperationState* releaseWindowAggregationState(
+ const window_aggregation_state_id id) {
+ DCHECK_LT(id, window_aggregation_states_.size());
+ DCHECK(window_aggregation_states_[id]);
+ return window_aggregation_states_[id].release();
+ }
+
private:
std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
@@ -471,6 +518,7 @@ class QueryContext {
std::vector<std::unique_ptr<const SortConfiguration>> sort_configs_;
std::vector<std::unique_ptr<Tuple>> tuples_;
std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> update_groups_;
+ std::vector<std::unique_ptr<WindowAggregationOperationState>> window_aggregation_states_;
DISALLOW_COPY_AND_ASSIGN(QueryContext);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index 98cd0b6..ddf8326 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -22,6 +22,7 @@ import "expressions/table_generator/GeneratorFunction.proto";
import "storage/AggregationOperationState.proto";
import "storage/HashTable.proto";
import "storage/InsertDestination.proto";
+import "storage/WindowAggregationOperationState.proto";
import "types/containers/Tuple.proto";
import "utility/BloomFilter.proto";
import "utility/SortConfiguration.proto";
@@ -55,5 +56,7 @@ message QueryContext {
// NOTE(zuyu): For UpdateWorkOrder only.
repeated UpdateGroup update_groups = 10;
- required uint64 query_id = 11;
+ repeated WindowAggregationOperationState window_aggregation_states = 11;
+
+ required uint64 query_id = 12;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 8912414..7e53b9d 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -88,6 +88,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_queryoptimizer_expressions_Predicate
quickstep_queryoptimizer_expressions_Scalar
quickstep_queryoptimizer_expressions_ScalarLiteral
+ quickstep_queryoptimizer_expressions_WindowAggregateFunction
quickstep_queryoptimizer_physical_Aggregate
quickstep_queryoptimizer_physical_CopyFrom
quickstep_queryoptimizer_physical_CreateIndex
@@ -130,6 +131,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_relationaloperators_TableGeneratorOperator
quickstep_relationaloperators_TextScanOperator
quickstep_relationaloperators_UpdateOperator
+ quickstep_relationaloperators_WindowAggregationOperator
quickstep_storage_AggregationOperationState_proto
quickstep_storage_HashTableFactory
quickstep_storage_HashTable_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 45f5f78..43d63f9 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -63,6 +63,7 @@
#include "query_optimizer/expressions/PatternMatcher.hpp"
#include "query_optimizer/expressions/Scalar.hpp"
#include "query_optimizer/expressions/ScalarLiteral.hpp"
+#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
#include "query_optimizer/physical/CopyFrom.hpp"
#include "query_optimizer/physical/CreateIndex.hpp"
#include "query_optimizer/physical/CreateTable.hpp"
@@ -104,6 +105,7 @@
#include "relational_operators/TableGeneratorOperator.hpp"
#include "relational_operators/TextScanOperator.hpp"
#include "relational_operators/UpdateOperator.hpp"
+#include "relational_operators/WindowAggregationOperator.hpp"
#include "storage/AggregationOperationState.pb.h"
#include "storage/HashTable.pb.h"
#include "storage/HashTableFactory.hpp"
@@ -284,8 +286,8 @@ void ExecutionGenerator::generatePlanInternal(
return convertUpdateTable(
std::static_pointer_cast<const P::UpdateTable>(physical_plan));
case P::PhysicalType::kWindowAggregate:
- THROW_SQL_ERROR()
- << "Window aggregate function is not supported yet :(";
+ return convertWindowAggregate(
+ std::static_pointer_cast<const P::WindowAggregate>(physical_plan));
default:
LOG(FATAL) << "Unknown physical plan node "
<< physical_plan->getShortString();
@@ -1639,5 +1641,100 @@ void ExecutionGenerator::convertTableGenerator(
temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation);
}
+void ExecutionGenerator::convertWindowAggregate(
+ const P::WindowAggregatePtr &physical_plan) {
+ // Create window_aggregation_operation_state proto.
+ const QueryContext::window_aggregation_state_id window_aggr_state_index =
+ query_context_proto_->window_aggregation_states_size();
+ S::WindowAggregationOperationState *window_aggr_state_proto =
+ query_context_proto_->add_window_aggregation_states();
+
+ // Get input.
+ const CatalogRelationInfo *input_relation_info =
+ findRelationInfoOutputByPhysical(physical_plan->input());
+ window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+
+ // Get window aggregate function expression.
+ const E::AliasPtr &named_window_aggregate_expression =
+ physical_plan->window_aggregate_expression();
+ const E::WindowAggregateFunctionPtr &window_aggregate_function =
+ std::static_pointer_cast<const E::WindowAggregateFunction>(
+ named_window_aggregate_expression->expression());
+
+ // Set the AggregateFunction.
+ window_aggr_state_proto->mutable_function()->MergeFrom(
+ window_aggregate_function->window_aggregate().getProto());
+
+ // Set the arguments.
+ for (const E::ScalarPtr &argument : window_aggregate_function->arguments()) {
+ unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+ window_aggr_state_proto->add_arguments()->MergeFrom(concretized_argument->getProto());
+ }
+
+ // Set partition keys.
+ const E::WindowInfo &window_info = window_aggregate_function->window_info();
+ for (const E::ScalarPtr &partition_by_attribute
+ : window_info.partition_by_attributes) {
+ unique_ptr<const Scalar> concretized_partition_by_attribute(
+ partition_by_attribute->concretize(attribute_substitution_map_));
+ window_aggr_state_proto->add_partition_by_attributes()
+ ->MergeFrom(concretized_partition_by_attribute->getProto());
+ }
+
+ // Set window frame info.
+ if (window_info.frame_info == nullptr) {
+ // If the frame is not specified, use the default setting:
+ // 1. If ORDER BY key is specified, use cumulative aggregation:
+ // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
+ // 2. If ORDER BY key is not specified either, use the whole partition:
+ // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
+ window_aggr_state_proto->set_is_row(true); // frame mode: ROWS.
+ window_aggr_state_proto->set_num_preceding(-1); // UNBOUNDED PRECEDING.
+ window_aggr_state_proto->set_num_following(
+ window_info.order_by_attributes.empty()
+ ? -1 // UNBOUNDED FOLLOWING.
+ : 0); // CURRENT ROW.
+ } else {
+ const E::WindowFrameInfo *window_frame_info = window_info.frame_info;
+ window_aggr_state_proto->set_is_row(window_frame_info->is_row);
+ window_aggr_state_proto->set_num_preceding(window_frame_info->num_preceding);
+ window_aggr_state_proto->set_num_following(window_frame_info->num_following);
+ }
+
+ // Create InsertDestination proto.
+ const CatalogRelation *output_relation = nullptr;
+ const QueryContext::insert_destination_id insert_destination_index =
+ query_context_proto_->insert_destinations_size();
+ S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ createTemporaryCatalogRelation(physical_plan,
+ &output_relation,
+ insert_destination_proto);
+
+ const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
+ execution_plan_->addRelationalOperator(
+ new WindowAggregationOperator(query_handle_->query_id(),
+ *output_relation,
+ window_aggr_state_index,
+ insert_destination_index));
+
+ // TODO(Shixuan): Once parallelism is introduced, the is_pipeline_breaker
+ // could be set to false.
+ if (!input_relation_info->isStoredRelation()) {
+ execution_plan_->addDirectDependency(window_aggregation_operator_index,
+ input_relation_info->producer_operator_index,
+ true /* is_pipeline_breaker */);
+ }
+
+ insert_destination_proto->set_relational_op_index(window_aggregation_operator_index);
+
+ // Add to map and temp_relation_info_vec.
+ physical_to_output_relation_map_.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(physical_plan),
+ std::forward_as_tuple(window_aggregation_operator_index, output_relation));
+ temporary_relation_info_vec_.emplace_back(window_aggregation_operator_index,
+ output_relation);
+}
+
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index c7fd018..9186707 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -60,6 +60,7 @@
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
#include "query_optimizer/physical/UpdateTable.hpp"
+#include "query_optimizer/physical/WindowAggregate.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -347,6 +348,13 @@ class ExecutionGenerator {
void convertTableGenerator(const physical::TableGeneratorPtr &physical_plan);
/**
+ * @brief Converts a physical WindowAggregate to a WindowAggregation operator.
+ *
+ * @param physical_plan The WindowAggregate to be converted.
+ */
+ void convertWindowAggregate(const physical::WindowAggregatePtr &physical_plan);
+
+ /**
* @brief Converts a list of NamedExpressions in the optimizer expression
* system to scalars proto in QueryContext proto.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 6bf5240..5d5b596 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -40,6 +40,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
quickstep_queryoptimizer_physical_TableGenerator
quickstep_queryoptimizer_physical_TableReference
quickstep_queryoptimizer_physical_TopLevelPlan
+ quickstep_queryoptimizer_physical_WindowAggregate
quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
glog
@@ -64,6 +65,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
quickstep_queryoptimizer_physical_TableGenerator
quickstep_queryoptimizer_physical_TableReference
quickstep_queryoptimizer_physical_TopLevelPlan
+ quickstep_queryoptimizer_physical_WindowAggregate
quickstep_utility_Macros)
# Module all-in-one library:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index 48f76fa..e5222ff 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -33,6 +33,7 @@
#include "query_optimizer/physical/TableGenerator.hpp"
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "query_optimizer/physical/WindowAggregate.hpp"
#include "glog/logging.h"
@@ -72,6 +73,9 @@ std::size_t SimpleCostModel::estimateCardinality(
return estimateCardinality(
shared_subplans_[shared_subplan_reference->subplan_id()]);
}
+ case P::PhysicalType::kWindowAggregate:
+ return estimateCardinalityForWindowAggregate(
+ std::static_pointer_cast<const P::WindowAggregate>(physical_plan));
default:
LOG(FATAL) << "Unsupported physical plan:" << physical_plan->toString();
}
@@ -118,6 +122,11 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
estimateCardinality(physical_plan->input()) / 10);
}
+std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(
+ const physical::WindowAggregatePtr &physical_plan) {
+ return estimateCardinality(physical_plan->input());
+}
+
} // namespace cost
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 9862198..9837039 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -32,6 +32,7 @@
#include "query_optimizer/physical/TableGenerator.hpp"
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "query_optimizer/physical/WindowAggregate.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
@@ -88,6 +89,10 @@ class SimpleCostModel : public CostModel {
std::size_t estimateCardinalityForAggregate(
const physical::AggregatePtr &physical_plan);
+ // Return the estimated cardinality of the input plan.
+ std::size_t estimateCardinalityForWindowAggregate(
+ const physical::WindowAggregatePtr &physical_plan);
+
const std::vector<physical::PhysicalPtr> &shared_subplans_;
DISALLOW_COPY_AND_ASSIGN(SimpleCostModel);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index eb9fcc1..badfeb1 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -85,6 +85,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality(
case P::PhysicalType::kSort:
return estimateCardinality(
std::static_pointer_cast<const P::Sort>(physical_plan)->input());
+ case P::PhysicalType::kWindowAggregate:
+ return estimateCardinalityForWindowAggregate(
+ std::static_pointer_cast<const P::WindowAggregate>(physical_plan));
default:
LOG(FATAL) << "Unsupported physical plan:" << physical_plan->toString();
}
@@ -141,6 +144,11 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate(
estimateCardinality(physical_plan->input()) / 10);
}
+std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate(
+ const P::WindowAggregatePtr &physical_plan) {
+ return estimateCardinality(physical_plan->input());
+}
+
double StarSchemaSimpleCostModel::estimateSelectivity(
const physical::PhysicalPtr &physical_plan) {
switch (physical_plan->getPhysicalType()) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index c63e55a..83032cf 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -33,6 +33,7 @@
#include "query_optimizer/physical/TableGenerator.hpp"
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "query_optimizer/physical/WindowAggregate.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
@@ -94,6 +95,9 @@ class StarSchemaSimpleCostModel : public CostModel {
std::size_t estimateCardinalityForAggregate(
const physical::AggregatePtr &physical_plan);
+ std::size_t estimateCardinalityForWindowAggregate(
+ const physical::WindowAggregatePtr &physical_plan);
+
double estimateSelectivityForSelection(
const physical::SelectionPtr &physical_plan);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index c07751a..f10378b 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -1860,12 +1860,17 @@ L::LogicalPtr Resolver::resolveJoinedTableReference(
L::LogicalPtr Resolver::resolveSortInWindow(
const L::LogicalPtr &logical_plan,
const E::WindowInfo &window_info) {
- // Sort the table by (p_key, o_key)
+ // Sort the table by (p_key, o_key).
std::vector<E::AttributeReferencePtr> sort_attributes(window_info.partition_by_attributes);
sort_attributes.insert(sort_attributes.end(),
window_info.order_by_attributes.begin(),
window_info.order_by_attributes.end());
+ // If (p_key, o_key) is empty, no sort is needed.
+ if (sort_attributes.empty()) {
+ return logical_plan;
+ }
+
std::vector<bool> sort_directions(
window_info.partition_by_attributes.size(), true);
sort_directions.insert(sort_directions.end(),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 05f7108..16127cc 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -950,3 +950,42 @@ WHERE double_col < 0
+--------------------+
| 5|
+--------------------+
+==
+
+# Window Aggregation Test.
+# Currently this is not supported, an empty table will be returned.
+SELECT avg(int_col) OVER w FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
++------------------------+
+|avg(int_col) |
++------------------------+
++------------------------+
+==
+
+SELECT int_col, sum(float_col) OVER
+(PARTITION BY char_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+FROM test;
+--
++-----------+------------------------+
+|int_col |sum(float_col) |
++-----------+------------------------+
++-----------+------------------------+
+==
+
+SELECT sum(avg(int_col) OVER w) FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
++------------------------+
+|sum(avg(int_col)) |
++------------------------+
+| NULL|
++------------------------+
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/query_optimizer/tests/resolver/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/resolver/Select.test b/query_optimizer/tests/resolver/Select.test
index 89ab84d..5e11ac0 100644
--- a/query_optimizer/tests/resolver/Select.test
+++ b/query_optimizer/tests/resolver/Select.test
@@ -3269,6 +3269,39 @@ TopLevelPlan
type=Double NULL]
==
+SELECT avg(int_col) OVER w FROM test
+WINDOW w AS
+(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING);
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | type=VarChar(20) NULL]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| | relation=$window_aggregate,type=Double NULL]
+| | +-WindowAggregateFunction[function=AVG,window_name=w,is_ascending=[],
+| | nulls_first=[],frame_mode=row,num_preceding=-1,num_following=-1]
+| | +-arguments=
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-partition_by=
+| | | +-[]
+| | +-order_by=
+| | +-[]
+| +-project_list=
+| +-Alias[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+| +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+| relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+==
+
SELECT int_col, sum(float_col) OVER w1 FROM test
WINDOW w2 AS
(PARTITION BY vchar_col, long_col
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 91d1097..249441d 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -61,6 +61,7 @@ add_library(quickstep_relationaloperators_SortRunGenerationOperator SortRunGener
add_library(quickstep_relationaloperators_TableGeneratorOperator TableGeneratorOperator.cpp TableGeneratorOperator.hpp)
add_library(quickstep_relationaloperators_TextScanOperator TextScanOperator.cpp TextScanOperator.hpp)
add_library(quickstep_relationaloperators_UpdateOperator UpdateOperator.cpp UpdateOperator.hpp)
+add_library(quickstep_relationaloperators_WindowAggregationOperator WindowAggregationOperator.cpp WindowAggregationOperator.hpp)
add_library(quickstep_relationaloperators_WorkOrder ../empty_src.cpp WorkOrder.hpp)
add_library(quickstep_relationaloperators_WorkOrderFactory WorkOrderFactory.cpp WorkOrderFactory.hpp)
add_library(quickstep_relationaloperators_WorkOrder_proto
@@ -423,6 +424,18 @@ target_link_libraries(quickstep_relationaloperators_UpdateOperator
quickstep_threading_ThreadIDBasedMap
quickstep_utility_Macros
tmb)
+target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator
+ glog
+ quickstep_catalog_CatalogRelation
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_queryexecution_WorkOrdersContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_StorageBlockInfo
+ quickstep_utility_Macros
+ tmb)
target_link_libraries(quickstep_relationaloperators_WorkOrder
glog
quickstep_queryexecution_QueryExecutionTypedefs
@@ -487,6 +500,7 @@ target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_TableGeneratorOperator
quickstep_relationaloperators_TextScanOperator
quickstep_relationaloperators_UpdateOperator
+ quickstep_relationaloperators_WindowAggregationOperator
quickstep_relationaloperators_WorkOrder
quickstep_relationaloperators_WorkOrderFactory
quickstep_relationaloperators_WorkOrder_proto)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
new file mode 100644
index 0000000..93cb9d4
--- /dev/null
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -0,0 +1,82 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "relational_operators/WindowAggregationOperator.hpp"
+
+#include <vector>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool WindowAggregationOperator::getAllWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
+ DCHECK(query_context != nullptr);
+
+ if (blocking_dependencies_met_ && !generated_) {
+ container->addNormalWorkOrder(
+ new WindowAggregationWorkOrder(
+ query_id_,
+ query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+ query_context->getInsertDestination(output_destination_index_)),
+ op_index_);
+ generated_ = true;
+ }
+
+ return generated_;
+}
+
+bool WindowAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ if (blocking_dependencies_met_ && !generated_) {
+ container->addWorkOrderProto(createWorkOrderProto(), op_index_);
+ generated_ = true;
+ }
+
+ return generated_;
+}
+
+serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::WINDOW_AGGREGATION);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
+ window_aggregation_state_index_);
+ proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
+ output_destination_index_);
+
+ return proto;
+}
+
+
+void WindowAggregationWorkOrder::execute() {
+ std::cout << "Window aggregation is not supported yet.\n"
+ << "An empty table is returned\n";
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
new file mode 100644
index 0000000..f3dfd14
--- /dev/null
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -0,0 +1,166 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_WINDOW_AGGREGATION_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_WINDOW_AGGREGATION_OPERATOR_HPP_
+
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class StorageManager;
+class WindowAggregationOperationState;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ * @{
+ */
+
+/**
+ * @brief An operator which performs window aggregation over a relation.
+ **/
+class WindowAggregationOperator : public RelationalOperator {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_id The ID of this query.
+ * @param input_relation The relation to perform aggregation over.
+ * @param window_aggregation_state_index The index of WindowAggregationState
+ * in QueryContext.
+ * @param output_destination_index The index of InsertDestination in
+ * QueryContext for the output.
+ **/
+ WindowAggregationOperator(const std::size_t query_id,
+ const CatalogRelation &output_relation,
+ const QueryContext::window_aggregation_state_id window_aggregation_state_index,
+ const QueryContext::insert_destination_id output_destination_index)
+ : RelationalOperator(query_id),
+ output_relation_(output_relation),
+ window_aggregation_state_index_(window_aggregation_state_index),
+ output_destination_index_(output_destination_index),
+ generated_(false) {}
+
+ ~WindowAggregationOperator() override {}
+
+ bool getAllWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) override;
+
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ const relation_id getOutputRelationID() const override {
+ return output_relation_.getID();
+ }
+
+ QueryContext::insert_destination_id getInsertDestinationID() const override {
+ return output_destination_index_;
+ }
+
+ private:
+ /**
+ * @brief Create Work Order proto.
+ *
+ * @return A window aggregation work order.
+ **/
+ serialization::WorkOrder* createWorkOrderProto();
+
+ const CatalogRelation &output_relation_;
+ const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
+ const QueryContext::insert_destination_id output_destination_index_;
+ bool generated_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by WindowAggregationOperator.
+ **/
+class WindowAggregationWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor
+ *
+ * @param query_id The ID of this query.
+ * @param state The WindowAggregationOperatorState to use.
+ * @param output_destination The InsertDestination for output.
+ **/
+ WindowAggregationWorkOrder(const std::size_t query_id,
+ WindowAggregationOperationState *state,
+ InsertDestination *output_destination)
+ : WorkOrder(query_id),
+ state_(state),
+ output_destination_(output_destination) {}
+
+ ~WindowAggregationWorkOrder() override {}
+
+ /**
+ * @brief Get the pointer to WindowAggregationOperationState.
+ * @note This is a quickfix for "unused variable". After the window aggregate
+ * functions are built, these methods might be dropped.
+ *
+ * @return A pointer to the window aggregation operation state.
+ **/
+ WindowAggregationOperationState* state() {
+ return state_;
+ }
+
+ /**
+ * @brief Get the pointer to output destination.
+ * @note This is a quickfix for "unused variable". After the window aggregate
+ * functions are built, these methods might be dropped.
+ *
+ * @return A pointer to the output destination.
+ **/
+ InsertDestination* output_destination() {
+ return output_destination_;
+ }
+
+ void execute() override;
+
+ private:
+ WindowAggregationOperationState *state_;
+ InsertDestination *output_destination_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationWorkOrder);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_RELATIONAL_OPERATORS_WINDOW_AGGREGATION_OPERATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 3ed065a..69dee1b 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -41,6 +41,7 @@ enum WorkOrderType {
TABLE_GENERATOR = 17;
TEXT_SCAN = 18;
UPDATE = 19;
+ WINDOW_AGGREGATION = 20;
}
message WorkOrder {
@@ -243,3 +244,11 @@ message UpdateWorkOrder {
optional fixed64 block_id = 325;
}
}
+
+message WindowAggregationWorkOrder {
+ extend WorkOrder {
+ // All required
+ optional uint32 window_aggr_state_index = 336;
+ optional int32 insert_destination_index = 337;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index b536411..9df66e1 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -131,6 +131,9 @@ QS_PROTOBUF_GENERATE_CPP(storage_InsertDestination_proto_srcs
QS_PROTOBUF_GENERATE_CPP(storage_StorageBlockLayout_proto_srcs
storage_StorageBlockLayout_proto_hdrs
StorageBlockLayout.proto)
+QS_PROTOBUF_GENERATE_CPP(storage_WindowAggregationOperationState_proto_srcs
+ storage_WindowAggregationOperationState_proto_hdrs
+ WindowAggregationOperationState.proto)
if (ENABLE_DISTRIBUTED)
GRPC_GENERATE_CPP(storage_DataExchange_proto_srcs
@@ -254,6 +257,11 @@ add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp
add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
add_library(quickstep_storage_ValueAccessor ../empty_src.cpp ValueAccessor.hpp)
add_library(quickstep_storage_ValueAccessorUtil ../empty_src.cpp ValueAccessorUtil.hpp)
+add_library(quickstep_storage_WindowAggregationOperationState
+ WindowAggregationOperationState.hpp
+ WindowAggregationOperationState.cpp)
+add_library(quickstep_storage_WindowAggregationOperationState_proto ${storage_WindowAggregationOperationState_proto_srcs})
+
# Link dependencies:
target_link_libraries(quickstep_storage_AggregationOperationState
@@ -1038,6 +1046,27 @@ target_link_libraries(quickstep_storage_ValueAccessorUtil
quickstep_storage_ValueAccessor
quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_WindowAggregationOperationState
+ glog
+ quickstep_catalog_CatalogDatabaseLite
+ quickstep_catalog_CatalogRelationSchema
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_ExpressionFactories
+ quickstep_expressions_Expressions_proto
+ quickstep_expressions_aggregation_AggregateFunction
+ quickstep_expressions_aggregation_AggregateFunctionFactory
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_expressions_aggregation_AggregationID
+ quickstep_expressions_scalar_Scalar
+ quickstep_expressions_scalar_ScalarAttribute
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_storage_WindowAggregationOperationState_proto
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
+ quickstep_expressions_aggregation_AggregateFunction_proto
+ quickstep_expressions_Expressions_proto
+ ${PROTOBUF_LIBRARY})
# Module all-in-one library:
add_library(quickstep_storage ../empty_src.cpp StorageModule.hpp)
@@ -1096,7 +1125,9 @@ target_link_libraries(quickstep_storage
quickstep_storage_TupleReference
quickstep_storage_TupleStorageSubBlock
quickstep_storage_ValueAccessor
- quickstep_storage_ValueAccessorUtil)
+ quickstep_storage_ValueAccessorUtil
+ quickstep_storage_WindowAggregationOperationState
+ quickstep_storage_WindowAggregationOperationState_proto)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
target_link_libraries(quickstep_storage
quickstep_storage_FileManagerHdfs)
@@ -1636,6 +1667,18 @@ target_link_libraries(StorageManager_unittest
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
quickstep_utility_ShardedLockManager)
+
+add_executable(WindowAggregationOperationState_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/WindowAggregationOperationState_unittest.cpp")
+target_link_libraries(WindowAggregationOperationState_unittest
+ gtest
+ gtest_main
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_storage_WindowAggregationOperationState
+ quickstep_storage_WindowAggregationOperationState_proto
+ ${LIBS})
if (QUICKSTEP_HAVE_LIBNUMA)
target_link_libraries(StorageManager_unittest
${LIBNUMA_LIBRARY})
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
new file mode 100644
index 0000000..a0bcc37
--- /dev/null
+++ b/storage/WindowAggregationOperationState.cpp
@@ -0,0 +1,179 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "storage/WindowAggregationOperationState.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabaseLite.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/ExpressionFactories.hpp"
+#include "expressions/Expressions.pb.h"
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregateFunctionFactory.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationOperationState::WindowAggregationOperationState(
+ const CatalogRelationSchema &input_relation,
+ const AggregateFunction *window_aggregate_function,
+ std::vector<std::unique_ptr<const Scalar>> &&arguments,
+ std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager)
+ : input_relation_(input_relation),
+ arguments_(std::move(arguments)),
+ partition_by_attributes_(std::move(partition_by_attributes)),
+ is_row_(is_row),
+ num_preceding_(num_preceding),
+ num_following_(num_following),
+ storage_manager_(storage_manager) {
+ // Get the Types of this window aggregate's arguments so that we can create an
+ // AggregationHandle.
+ // TODO(Shixuan): Next step: New handles for window aggregation function.
+ std::vector<const Type*> argument_types;
+ for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+ argument_types.emplace_back(&argument->getType());
+ }
+
+ // Check if window aggregate function could apply to the arguments.
+ DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
+
+ // Create the handle and initial state.
+ window_aggregation_handle_.reset(
+ window_aggregate_function->createHandle(argument_types));
+ window_aggregation_state_.reset(
+ window_aggregation_handle_->createInitialState());
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+ // See if all of this window aggregate's arguments are attributes in the input
+ // relation. If so, remember the attribute IDs so that we can do copy elision
+ // when actually performing the window aggregation.
+ arguments_as_attributes_.reserve(arguments_.size());
+ for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+ const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
+ if (argument_id == -1) {
+ arguments_as_attributes_.clear();
+ break;
+ } else {
+ DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
+ arguments_as_attributes_.push_back(argument_id);
+ }
+ }
+#endif
+}
+
+WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
+ const serialization::WindowAggregationOperationState &proto,
+ const CatalogDatabaseLite &database,
+ StorageManager *storage_manager) {
+ DCHECK(ProtoIsValid(proto, database));
+
+ // Rebuild contructor arguments from their representation in 'proto'.
+ const AggregateFunction *aggregate_function
+ = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
+
+ std::vector<std::unique_ptr<const Scalar>> arguments;
+ arguments.reserve(proto.arguments_size());
+ for (int argument_idx = 0; argument_idx < proto.arguments_size(); ++argument_idx) {
+ arguments.emplace_back(ScalarFactory::ReconstructFromProto(
+ proto.arguments(argument_idx),
+ database));
+ }
+
+ std::vector<std::unique_ptr<const Scalar>> partition_by_attributes;
+ for (int attribute_idx = 0;
+ attribute_idx < proto.partition_by_attributes_size();
+ ++attribute_idx) {
+ partition_by_attributes.emplace_back(ScalarFactory::ReconstructFromProto(
+ proto.partition_by_attributes(attribute_idx),
+ database));
+ }
+
+ const bool is_row = proto.is_row();
+ const std::int64_t num_preceding = proto.num_preceding();
+ const std::int64_t num_following = proto.num_following();
+
+ return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
+ aggregate_function,
+ std::move(arguments),
+ std::move(partition_by_attributes),
+ is_row,
+ num_preceding,
+ num_following,
+ storage_manager);
+}
+
+bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
+ const CatalogDatabaseLite &database) {
+ if (!proto.IsInitialized() ||
+ !database.hasRelationWithId(proto.relation_id())) {
+ return false;
+ }
+
+ if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+ return false;
+ }
+
+ // TODO(chasseur): We may also want to check that the specified
+ // AggregateFunction is applicable to the specified arguments, but that
+ // requires partial deserialization and may be too heavyweight for this
+ // method.
+ // TODO(Shixuan): The TODO for AggregateFunction could also be applied here.
+ for (int argument_idx = 0;
+ argument_idx < proto.arguments_size();
+ ++argument_idx) {
+ if (!ScalarFactory::ProtoIsValid(proto.arguments(argument_idx), database)) {
+ return false;
+ }
+ }
+
+ for (int attribute_idx = 0;
+ attribute_idx < proto.partition_by_attributes_size();
+ ++attribute_idx) {
+ if (!ScalarFactory::ProtoIsValid(proto.partition_by_attributes(attribute_idx),
+ database)) {
+ return false;
+ }
+ }
+
+ if (proto.num_preceding() < -1 || proto.num_following() < -1) {
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
new file mode 100644
index 0000000..d7b3e6a
--- /dev/null
+++ b/storage/WindowAggregationOperationState.hpp
@@ -0,0 +1,177 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
+#define QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class AggregateFunction;
+class CatalogDatabaseLite;
+class CatalogRelationSchema;
+class InsertDestination;
+class StorageManager;
+
+/** \addtogroup Storage
+ * @{
+ */
+
+/**
+ * @brief Helper class for maintaining the state of window aggregation.
+ **/
+class WindowAggregationOperationState {
+ public:
+ /**
+ * @brief Constructor for window aggregation operation state.
+ *
+ * @param input_relation Input relation on which window aggregation is computed.
+ * @param window_aggregate_functions The window aggregate function to be
+ * computed.
+ * @param arguments A list of argument expressions to that aggregate.
+ * @param partition_by_attributes A list of window partition key.
+ * @param is_row True if the window frame is calculated by ROW, false if it is
+ * calculated by RANGE.
+ * @param num_preceding The number of rows/range for the tuples preceding the
+ * current row. -1 means UNBOUNDED PRECEDING.
+ * @param num_following The number of rows/range for the tuples following the
+ * current row. -1 means UNBOUNDED FOLLOWING.
+ * @param storage_manager The StorageManager to use for allocating hash
+ * tables.
+ */
+ WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
+ const AggregateFunction *window_aggregate_function,
+ std::vector<std::unique_ptr<const Scalar>> &&arguments,
+ std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager);
+
+ ~WindowAggregationOperationState() {}
+
+ /**
+ * @brief Generate the window aggregation operation state from the serialized
+ * Protocol Buffer representation.
+ *
+ * @param proto A serialized protocol buffer representation of a
+ * WindowAggregationOperationState, originally generated by the
+ * optimizer.
+ * @param database The database for resolving relation and attribute
+ * references.
+ * @param storage_manager The StorageManager to use.
+ **/
+ static WindowAggregationOperationState* ReconstructFromProto(
+ const serialization::WindowAggregationOperationState &proto,
+ const CatalogDatabaseLite &database,
+ StorageManager *storage_manager);
+
+ /**
+ * @brief Check whether a serialization::AggregationOperationState is
+ * fully-formed and all parts are valid.
+ *
+ * @param proto A serialized Protocol Buffer representation of an
+ * AggregationOperationState, originally generated by the optimizer.
+ * @param database The Database to resolve relation and attribute references
+ * in.
+ * @return Whether proto is fully-formed and valid.
+ **/
+ static bool ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
+ const CatalogDatabaseLite &database);
+
+ /**
+ * @brief Get the is_row info.
+ * @note This is a quickfix for "unused variable". After the window aggregate
+ * functions are built, these methods might be dropped.
+ *
+ * @return True if the frame mode is ROW, false if it is RANGE.
+ **/
+ const bool is_row() const { return is_row_; }
+
+ /**
+ * @brief Get the num_preceding info.
+ * @note This is a quickfix for "unused variable". After the window aggregate
+ * functions are built, these methods might be dropped.
+ *
+ * @return The number of rows/range that precedes the current row.
+ **/
+ const std::int64_t num_preceding() const { return num_preceding_; }
+
+ /**
+ * @brief Get the num_following info.
+ * @note This is a quickfix for "unused variable". After the window aggregate
+ * functions are built, these methods might be dropped.
+ *
+ * @return The number of rows/range that follows the current row.
+ **/
+ const std::int64_t num_following() const { return num_following_; }
+
+ /**
+ * @brief Get the pointer to StorageManager.
+ * @note This is a quickfix for "unused variable". After the window aggregate
+ * functions are built, these methods might be dropped.
+ *
+ * @return A pointer to the storage manager.
+ **/
+ StorageManager *storage_manager() { return storage_manager_; }
+
+ private:
+ const CatalogRelationSchema &input_relation_;
+
+ // TODO(Shixuan): Handle and State for window aggregation will be needed for
+ // actual calculation.
+ std::unique_ptr<AggregationHandle> window_aggregation_handle_;
+ std::unique_ptr<AggregationState> window_aggregation_state_;
+ std::vector<std::unique_ptr<const Scalar>> arguments_;
+
+ // We don't add order_by_attributes here since it is not needed after sorting.
+ std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
+
+ // Window framing information.
+ const bool is_row_;
+ const std::int64_t num_preceding_;
+ const std::int64_t num_following_;
+
+ StorageManager *storage_manager_;
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+ // If all an aggregate's argument expressions are simply attributes in
+ // 'input_relation_', then this caches the attribute IDs of those arguments.
+ std::vector<attribute_id> arguments_as_attributes_;
+#endif
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_WINDOW_AGGREGATION_OPERATION_STATE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
new file mode 100644
index 0000000..c7bd0ef
--- /dev/null
+++ b/storage/WindowAggregationOperationState.proto
@@ -0,0 +1,33 @@
+// Copyright 2011-2015 Quickstep Technologies LLC.
+// Copyright 2015-2016 Pivotal Software, Inc.
+// Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+// University of Wisconsin\u2014Madison.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/Expressions.proto";
+
+message WindowAggregationOperationState {
+ required int32 relation_id = 1;
+ required AggregateFunction function = 2;
+ repeated Scalar arguments = 3;
+ repeated Scalar partition_by_attributes = 4;
+ required bool is_row = 5;
+ required int64 num_preceding = 6; // -1 means UNBOUNDED PRECEDING.
+ required int64 num_following = 7; // -1 means UNBOUNDED FOLLOWING.
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7671a589/storage/tests/WindowAggregationOperationState_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/WindowAggregationOperationState_unittest.cpp b/storage/tests/WindowAggregationOperationState_unittest.cpp
new file mode 100644
index 0000000..c572034
--- /dev/null
+++ b/storage/tests/WindowAggregationOperationState_unittest.cpp
@@ -0,0 +1,92 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <cstddef>
+#include <memory>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "storage/WindowAggregationOperationState.hpp"
+#include "storage/WindowAggregationOperationState.pb.h"
+
+#include "gtest/gtest.h"
+
+using std::unique_ptr;
+
+namespace quickstep {
+
+namespace {
+ constexpr relation_id kInvalidTableId = 100;
+ constexpr std::int64_t kInvalidNum = -10;
+ constexpr std::int64_t kValidNum = 10;
+} // namespace
+
+class WindowAggregationOperationStateProtoTest : public ::testing::Test {
+ protected:
+ virtual void SetUp() {
+ database_.reset(new CatalogDatabase(nullptr, "db"));
+ rel_id_ = database_->addRelation(new CatalogRelation(nullptr, "rel"));
+ }
+
+ unique_ptr<CatalogDatabase> database_;
+ relation_id rel_id_;
+};
+
+TEST_F(WindowAggregationOperationStateProtoTest, UninitializationTest) {
+ serialization::WindowAggregationOperationState proto;
+ EXPECT_FALSE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+}
+
+TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
+ serialization::WindowAggregationOperationState proto;
+ proto.set_relation_id(kInvalidTableId);
+ proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_is_row(true);
+ proto.set_num_preceding(kValidNum);
+ proto.set_num_following(kValidNum);
+ EXPECT_FALSE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+}
+
+TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
+ serialization::WindowAggregationOperationState proto;
+ proto.set_relation_id(rel_id_);
+ proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_is_row(true);
+ proto.set_num_preceding(kInvalidNum);
+ proto.set_num_following(kValidNum);
+ EXPECT_FALSE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+
+ proto.set_num_preceding(kValidNum);
+ proto.set_num_following(kInvalidNum);
+ EXPECT_FALSE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+}
+
+TEST_F(WindowAggregationOperationStateProtoTest, ValidTest) {
+ serialization::WindowAggregationOperationState proto;
+ proto.set_relation_id(rel_id_);
+ proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_is_row(true);
+ proto.set_num_preceding(kValidNum);
+ proto.set_num_following(kValidNum);
+ EXPECT_TRUE(WindowAggregationOperationState::ProtoIsValid(proto, *database_.get()));
+}
+
+} // namespace quickstep