You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/29 23:44:13 UTC
[10/25] incubator-quickstep git commit: Renamed Foreman related
classes.
Renamed Foreman related classes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4fb884c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4fb884c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4fb884c1
Branch: refs/heads/dist-exe-test-new
Commit: 4fb884c1bef53b00ec3e0362b8de401b7c0b07f6
Parents: a37bf26
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Jul 7 14:13:19 2016 -0500
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Fri Jul 29 16:42:10 2016 -0700
----------------------------------------------------------------------
CMakeLists.txt | 2 +-
cli/CMakeLists.txt | 1 -
cli/CommandExecutor.cpp | 1 -
cli/CommandExecutor.hpp | 1 -
cli/QuickstepCli.cpp | 21 +-
cli/tests/CMakeLists.txt | 2 +-
cli/tests/CommandExecutorTestRunner.cpp | 2 +-
cli/tests/CommandExecutorTestRunner.hpp | 15 +-
query_execution/CMakeLists.txt | 28 +-
query_execution/Foreman.cpp | 255 ------------------
query_execution/Foreman.hpp | 140 ----------
query_execution/ForemanBase.hpp | 85 ++++++
query_execution/ForemanLite.hpp | 85 ------
query_execution/ForemanSingleNode.cpp | 256 +++++++++++++++++++
query_execution/ForemanSingleNode.hpp | 140 ++++++++++
query_optimizer/tests/CMakeLists.txt | 2 +-
.../tests/ExecutionGeneratorTestRunner.cpp | 2 +-
.../tests/ExecutionGeneratorTestRunner.hpp | 15 +-
18 files changed, 527 insertions(+), 526 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index de6754a..042c050 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -756,7 +756,7 @@ target_link_libraries(quickstep_cli_shell
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_Foreman
+ quickstep_queryexecution_ForemanSingleNode
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 44ec223..9637055 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -89,7 +89,6 @@ target_link_libraries(quickstep_cli_CommandExecutor
quickstep_cli_PrintToScreen
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
- quickstep_queryexecution_Foreman
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_QueryPlan
quickstep_queryoptimizer_QueryProcessor
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 7083ef5..8acfae8 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -34,7 +34,6 @@
#include "parser/ParseStatement.hpp"
#include "parser/ParseString.hpp"
#include "parser/SqlParserWrapper.hpp"
-#include "query_execution/Foreman.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "query_optimizer/QueryProcessor.hpp"
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/CommandExecutor.hpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.hpp b/cli/CommandExecutor.hpp
index 3435aeb..19d03e6 100644
--- a/cli/CommandExecutor.hpp
+++ b/cli/CommandExecutor.hpp
@@ -32,7 +32,6 @@ namespace tmb { class MessageBus; }
namespace quickstep {
class CatalogDatabase;
-class Foreman;
class ParseStatement;
class QueryProcessor;
class StorageManager;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 02a55a0..68a3599 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -58,7 +58,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
#include "parser/ParseStatement.hpp"
#include "parser/SqlParserWrapper.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/Worker.hpp"
@@ -104,7 +104,7 @@ using quickstep::AdmitRequestMessage;
using quickstep::CatalogRelation;
using quickstep::DefaultsConfigurator;
using quickstep::DropRelation;
-using quickstep::Foreman;
+using quickstep::ForemanSingleNode;
using quickstep::InputParserUtil;
using quickstep::MessageBusImpl;
using quickstep::MessageStyle;
@@ -353,14 +353,15 @@ int main(int argc, char* argv[]) {
worker_client_ids,
worker_numa_nodes);
- Foreman foreman(main_thread_client_id,
- &worker_directory,
- &bus,
- query_processor->getDefaultDatabase(),
- query_processor->getStorageManager(),
- -1, // Don't pin the Foreman thread.
- num_numa_nodes_system,
- quickstep::FLAGS_profile_and_report_workorder_perf);
+ ForemanSingleNode foreman(
+ main_thread_client_id,
+ &worker_directory,
+ &bus,
+ query_processor->getDefaultDatabase(),
+ query_processor->getStorageManager(),
+ -1, // Don't pin the Foreman thread.
+ num_numa_nodes_system,
+ quickstep::FLAGS_profile_and_report_workorder_perf);
// Start the worker threads.
for (Worker &worker : workers) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt
index d177d6c..7da56d1 100644
--- a/cli/tests/CMakeLists.txt
+++ b/cli/tests/CMakeLists.txt
@@ -33,7 +33,7 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_Foreman
+ quickstep_queryexecution_ForemanSingleNode
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp
index 794f7e1..bd7082f 100644
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@ -27,7 +27,7 @@
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/Worker.hpp"
#include "query_optimizer/ExecutionGenerator.hpp"
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/tests/CommandExecutorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.hpp b/cli/tests/CommandExecutorTestRunner.hpp
index 8fb5b65..69692ae 100644
--- a/cli/tests/CommandExecutorTestRunner.hpp
+++ b/cli/tests/CommandExecutorTestRunner.hpp
@@ -25,7 +25,7 @@
#include <vector>
#include "parser/SqlParserWrapper.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/Worker.hpp"
@@ -77,11 +77,12 @@ class CommandExecutorTestRunner : public TextBasedTestRunner {
workers_.reset(new WorkerDirectory(1 /* number of workers */,
worker_client_ids, numa_nodes));
- foreman_.reset(new Foreman(main_thread_client_id_,
- workers_.get(),
- &bus_,
- test_database_loader_.catalog_database(),
- test_database_loader_.storage_manager()));
+ foreman_.reset(
+ new ForemanSingleNode(main_thread_client_id_,
+ workers_.get(),
+ &bus_,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager()));
foreman_->start();
worker_->start();
@@ -104,7 +105,7 @@ class CommandExecutorTestRunner : public TextBasedTestRunner {
tmb::client_id main_thread_client_id_;
MessageBusImpl bus_;
- std::unique_ptr<Foreman> foreman_;
+ std::unique_ptr<ForemanSingleNode> foreman_;
std::unique_ptr<Worker> worker_;
std::unique_ptr<WorkerDirectory> workers_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b031a44..2be451c 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -1,6 +1,6 @@
# Copyright 2011-2015 Quickstep Technologies LLC.
# Copyright 2015-2016 Pivotal Software, Inc.
-# Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+# Copyright 2016, Quickstep Research Group, Computer Sciences Department,
# University of Wisconsin\u2014Madison.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -32,8 +32,8 @@ if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
endif()
add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
-add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
-add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
+add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
@@ -69,11 +69,15 @@ if (ENABLE_DISTRIBUTED)
quickstep_utility_Macros
tmb)
endif()
-target_link_libraries(quickstep_queryexecution_Foreman
- ${GFLAGS_LIB_NAME}
+target_link_libraries(quickstep_queryexecution_ForemanBase
+ glog
+ quickstep_threading_Thread
+ quickstep_utility_Macros
+ tmb)
+target_link_libraries(quickstep_queryexecution_ForemanSingleNode
glog
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_ForemanLite
+ quickstep_queryexecution_ForemanBase
quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
@@ -82,12 +86,8 @@ target_link_libraries(quickstep_queryexecution_Foreman
quickstep_threading_ThreadUtil
quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
- tmb)
-target_link_libraries(quickstep_queryexecution_ForemanLite
- glog
- quickstep_threading_Thread
- quickstep_utility_Macros
- tmb)
+ tmb
+ ${GFLAGS_LIB_NAME})
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
${GFLAGS_LIB_NAME}
glog
@@ -199,8 +199,8 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_Foreman
- quickstep_queryexecution_ForemanLite
+ quickstep_queryexecution_ForemanBase
+ quickstep_queryexecution_ForemanSingleNode
quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
deleted file mode 100644
index 98146e2..0000000
--- a/query_execution/Foreman.cpp
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Copyright 2011-2015 Quickstep Technologies LLC.
- * Copyright 2015-2016 Pivotal Software, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#include "query_execution/Foreman.hpp"
-
-#include <cstddef>
-#include <cstdio>
-#include <memory>
-#include <tuple>
-#include <utility>
-#include <vector>
-
-#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "threading/ThreadUtil.hpp"
-#include "utility/EqualsAnyConstant.hpp"
-#include "utility/Macros.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::size_t;
-using std::unique_ptr;
-using std::vector;
-
-namespace quickstep {
-
-DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
- "of pending work orders for the worker. This information is used "
- "by the Foreman to assign work orders to worker threads");
-
-Foreman::Foreman(const tmb::client_id main_thread_client_id,
- WorkerDirectory *worker_directory,
- tmb::MessageBus *bus,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- const int cpu_id,
- const size_t num_numa_nodes,
- const bool profile_individual_workorders)
- : ForemanLite(bus, cpu_id),
- main_thread_client_id_(main_thread_client_id),
- worker_directory_(DCHECK_NOTNULL(worker_directory)),
- catalog_database_(DCHECK_NOTNULL(catalog_database)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {
- const std::vector<QueryExecutionMessageType> sender_message_types{
- kPoisonMessage,
- kRebuildWorkOrderMessage,
- kWorkOrderMessage,
- kWorkloadCompletionMessage};
-
- for (const auto message_type : sender_message_types) {
- bus_->RegisterClientAsSender(foreman_client_id_, message_type);
- }
-
- const std::vector<QueryExecutionMessageType> receiver_message_types{
- kAdmitRequestMessage,
- kCatalogRelationNewBlockMessage,
- kDataPipelineMessage,
- kPoisonMessage,
- kRebuildWorkOrderCompleteMessage,
- kWorkOrderFeedbackMessage,
- kWorkOrdersAvailableMessage,
- kWorkOrderCompleteMessage};
-
- for (const auto message_type : receiver_message_types) {
- bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
- }
-
- policy_enforcer_.reset(new PolicyEnforcer(
- foreman_client_id_,
- num_numa_nodes,
- catalog_database_,
- storage_manager_,
- worker_directory_,
- bus_,
- profile_individual_workorders));
-}
-
-void Foreman::run() {
- if (cpu_id_ >= 0) {
- // We can pin the foreman thread to a CPU if specified.
- ThreadUtil::BindToCPU(cpu_id_);
- }
-
- // Event loop
- for (;;) {
- // Receive() causes this thread to sleep until next message is received.
- const AnnotatedMessage annotated_msg =
- bus_->Receive(foreman_client_id_, 0, true);
- const TaggedMessage &tagged_message = annotated_msg.tagged_message;
- const tmb::message_type_id message_type = tagged_message.message_type();
- switch (message_type) {
- case kCatalogRelationNewBlockMessage: // Fall through
- case kDataPipelineMessage:
- case kRebuildWorkOrderCompleteMessage:
- case kWorkOrderCompleteMessage:
- case kWorkOrderFeedbackMessage:
- case kWorkOrdersAvailableMessage: {
- policy_enforcer_->processMessage(tagged_message);
- break;
- }
-
- case kAdmitRequestMessage: {
- const AdmitRequestMessage *msg =
- static_cast<const AdmitRequestMessage *>(tagged_message.message());
- const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
-
- DCHECK(!query_handles.empty());
- bool all_queries_admitted = true;
- if (query_handles.size() == 1u) {
- all_queries_admitted =
- policy_enforcer_->admitQuery(query_handles.front());
- } else {
- all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
- }
- if (!all_queries_admitted) {
- LOG(WARNING) << "The scheduler could not admit all the queries";
- // TODO(harshad) - Inform the main thread about the failure.
- }
- break;
- }
- case kPoisonMessage: {
- if (policy_enforcer_->hasQueries()) {
- LOG(WARNING) << "Foreman thread exiting while some queries are "
- "under execution or waiting to be admitted";
- }
- return;
- }
- default:
- LOG(FATAL) << "Unknown message type to Foreman";
- }
-
- if (canCollectNewMessages(message_type)) {
- vector<unique_ptr<WorkerMessage>> new_messages;
- policy_enforcer_->getWorkerMessages(&new_messages);
- dispatchWorkerMessages(new_messages);
- }
-
- // We check again, as some queries may produce zero work orders and finish
- // their execution.
- if (!policy_enforcer_->hasQueries()) {
- // Signal the main thread that there are no queries to be executed.
- // Currently the message doesn't have any real content.
- const int dummy_payload = 0;
- TaggedMessage completion_tagged_message(
- &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
- const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(
- bus_,
- foreman_client_id_,
- main_thread_client_id_,
- move(completion_tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "Message could not be sent from Foreman with TMB client ID "
- << foreman_client_id_ << " to main thread with TMB client ID"
- << main_thread_client_id_;
- }
- }
-}
-
-bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) {
- if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
- kCatalogRelationNewBlockMessage,
- kWorkOrderFeedbackMessage)) {
- return false;
- } else if (worker_directory_->getLeastLoadedWorker().second <=
- FLAGS_min_load_per_worker) {
- // If the least loaded worker has only one pending work order, we should
- // collect new messages and dispatch them.
- return true;
- } else {
- return false;
- }
-}
-
-void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
- for (const auto &message : messages) {
- DCHECK(message != nullptr);
- const int recipient_worker_thread_index = message->getRecipientHint();
- if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
- sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
- *message);
- worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
- } else {
- const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
- sendWorkerMessage(least_loaded_worker_thread_index, *message);
- worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
- }
- }
-}
-
-void Foreman::sendWorkerMessage(const size_t worker_thread_index,
- const WorkerMessage &message) {
- tmb::message_type_id type;
- if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) {
- type = kRebuildWorkOrderMessage;
- } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) {
- type = kWorkOrderMessage;
- } else {
- FATAL_ERROR("Invalid WorkerMessageType");
- }
- TaggedMessage worker_tagged_message(&message, sizeof(message), type);
-
- const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- worker_directory_->getClientID(worker_thread_index),
- move(worker_tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
- "Message could not be sent from Foreman with TMB client ID "
- << foreman_client_id_ << " to Foreman with TMB client ID "
- << worker_directory_->getClientID(worker_thread_index);
-}
-
-void Foreman::printWorkOrderProfilingResults(const std::size_t query_id,
- std::FILE *out) const {
- const std::vector<
- std::tuple<std::size_t, std::size_t, std::size_t>>
- &recorded_times = policy_enforcer_->getProfilingResults(query_id);
- fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);
- for (auto workorder_entry : recorded_times) {
- // Note: Index of the "worker thread index" in the tuple is 0.
- const std::size_t worker_id = std::get<0>(workorder_entry);
- fprintf(out,
- "%lu,%lu,%d,%lu,%lu\n",
- query_id,
- worker_id,
- worker_directory_->getNUMANode(worker_id),
- std::get<1>(workorder_entry), // Operator ID.
- std::get<2>(workorder_entry)); // Time.
- }
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
deleted file mode 100644
index 7be57e7..0000000
--- a/query_execution/Foreman.hpp
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Copyright 2011-2015 Quickstep Technologies LLC.
- * Copyright 2015-2016 Pivotal Software, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
-
-#include <cstddef>
-#include <cstdio>
-#include <memory>
-#include <vector>
-
-#include "query_execution/ForemanLite.hpp"
-#include "query_execution/PolicyEnforcer.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class StorageManager;
-class WorkerDirectory;
-class WorkerMessage;
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief The Foreman receives queries from the main thread, messages from the
- * policy enforcer and dispatches the work to worker threads. It also
- * receives work completion messages from workers.
- **/
-class Foreman final : public ForemanLite {
- public:
- /**
- * @brief Constructor.
- *
- * @param main_thread_client_id The TMB client ID of the main thread.
- * @param worker_directory The worker directory.
- * @param bus A pointer to the TMB.
- * @param catalog_database The catalog database where this query is executed.
- * @param storage_manager The StorageManager to use.
- * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
- * @param num_numa_nodes The number of NUMA nodes in the system.
- * @param profile_individual_workorders Whether every workorder's execution
- * be profiled or not.
- *
- * @note If cpu_id is not specified, Foreman thread can be possibly moved
- * around on different CPUs by the OS.
- **/
- Foreman(const tmb::client_id main_thread_client_id,
- WorkerDirectory *worker_directory,
- tmb::MessageBus *bus,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- const int cpu_id = -1,
- const std::size_t num_numa_nodes = 1,
- const bool profile_individual_workorders = false);
-
- ~Foreman() override {}
-
- /**
- * @brief Print the results of profiling individual work orders for a given
- * query.
- *
- * TODO(harshad) - Add the name of the operator to the output.
- * TODO(harshad) - Add the CPU core ID of the operator to the output. This
- * will require modifying the WorkerDirectory to remember worker affinities.
- * Until then, the users can refer to the worker_affinities provided to the
- * cli to infer the CPU core ID where a given worker is pinned.
- *
- * @param query_id The ID of the query for which the results are to be printed.
- * @param out The file stream.
- **/
- void printWorkOrderProfilingResults(const std::size_t query_id,
- std::FILE *out) const;
-
- protected:
- void run() override;
-
- private:
- /**
- * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the
- * worker threads.
- *
- * @param messages The messages to be dispatched.
- **/
- void dispatchWorkerMessages(
- const std::vector<std::unique_ptr<WorkerMessage>> &messages);
-
- /**
- * @brief Send the given message to the specified worker.
- *
- * @param worker_thread_index The logical index of the recipient worker thread
- * in WorkerDirectory.
- * @param message The WorkerMessage to be sent.
- **/
- void sendWorkerMessage(const std::size_t worker_thread_index,
- const WorkerMessage &message);
-
- /**
- * @brief Check if we can collect new messages from the PolicyEnforcer.
- *
- * @param message_type The type of the last received message.
- **/
- bool canCollectNewMessages(const tmb::message_type_id message_type);
-
- const tmb::client_id main_thread_client_id_;
-
- WorkerDirectory *worker_directory_;
-
- CatalogDatabaseLite *catalog_database_;
- StorageManager *storage_manager_;
-
- std::unique_ptr<PolicyEnforcer> policy_enforcer_;
-
- DISALLOW_COPY_AND_ASSIGN(Foreman);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanBase.hpp b/query_execution/ForemanBase.hpp
new file mode 100644
index 0000000..274b8fc
--- /dev/null
+++ b/query_execution/ForemanBase.hpp
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
+
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A base class that Foreman implements. This class is used to derive
+ * for implementations for both the single-node and distributed versions.
+ **/
+class ForemanBase : public Thread {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param bus A pointer to the TMB.
+ * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+ *
+ * @note If cpu_id is not specified, Foreman thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ ForemanBase(tmb::MessageBus *bus,
+ const int cpu_id)
+ : bus_(DCHECK_NOTNULL(bus)),
+ cpu_id_(cpu_id) {
+ foreman_client_id_ = bus_->Connect();
+ }
+
+ ~ForemanBase() override {}
+
+ /**
+ * @brief Get the TMB client ID of Foreman thread.
+ *
+ * @return TMB client ID of foreman thread.
+ **/
+ tmb::client_id getBusClientID() const {
+ return foreman_client_id_;
+ }
+
+ protected:
+ void run() override = 0;
+
+ tmb::MessageBus *bus_;
+
+ tmb::client_id foreman_client_id_;
+
+ // The ID of the CPU that the Foreman thread can optionally be pinned to.
+ const int cpu_id_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ForemanBase);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanLite.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanLite.hpp b/query_execution/ForemanLite.hpp
deleted file mode 100644
index cb6cdf3..0000000
--- a/query_execution/ForemanLite.hpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Copyright 2016 Pivotal Software, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_
-
-#include "threading/Thread.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A base class that Foreman implements. This class is used to derive
- * for implementations for both the single-node and distributed versions.
- **/
-class ForemanLite : public Thread {
- public:
- /**
- * @brief Constructor.
- *
- * @param bus A pointer to the TMB.
- * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
- *
- * @note If cpu_id is not specified, Foreman thread can be possibly moved
- * around on different CPUs by the OS.
- **/
- ForemanLite(tmb::MessageBus *bus,
- const int cpu_id)
- : bus_(DCHECK_NOTNULL(bus)),
- cpu_id_(cpu_id) {
- foreman_client_id_ = bus_->Connect();
- }
-
- ~ForemanLite() override {}
-
- /**
- * @brief Get the TMB client ID of Foreman thread.
- *
- * @return TMB client ID of foreman thread.
- **/
- tmb::client_id getBusClientID() const {
- return foreman_client_id_;
- }
-
- protected:
- void run() override = 0;
-
- tmb::MessageBus *bus_;
-
- tmb::client_id foreman_client_id_;
-
- // The ID of the CPU that the Foreman thread can optionally be pinned to.
- const int cpu_id_;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(ForemanLite);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
new file mode 100644
index 0000000..3aa1f0b
--- /dev/null
+++ b/query_execution/ForemanSingleNode.cpp
@@ -0,0 +1,256 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/ForemanSingleNode.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+namespace quickstep {
+
+DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
+ "of pending work orders for the worker. This information is used "
+ "by the Foreman to assign work orders to worker threads");
+
+ForemanSingleNode::ForemanSingleNode(
+ const tmb::client_id main_thread_client_id,
+ WorkerDirectory *worker_directory,
+ tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ const int cpu_id,
+ const size_t num_numa_nodes,
+ const bool profile_individual_workorders)
+ : ForemanBase(bus, cpu_id),
+ main_thread_client_id_(main_thread_client_id),
+ worker_directory_(DCHECK_NOTNULL(worker_directory)),
+ catalog_database_(DCHECK_NOTNULL(catalog_database)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ const std::vector<QueryExecutionMessageType> sender_message_types{
+ kPoisonMessage,
+ kRebuildWorkOrderMessage,
+ kWorkOrderMessage,
+ kWorkloadCompletionMessage};
+
+ for (const auto message_type : sender_message_types) {
+ bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+ }
+
+ const std::vector<QueryExecutionMessageType> receiver_message_types{
+ kAdmitRequestMessage,
+ kCatalogRelationNewBlockMessage,
+ kDataPipelineMessage,
+ kPoisonMessage,
+ kRebuildWorkOrderCompleteMessage,
+ kWorkOrderFeedbackMessage,
+ kWorkOrdersAvailableMessage,
+ kWorkOrderCompleteMessage};
+
+ for (const auto message_type : receiver_message_types) {
+ bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+ }
+
+ policy_enforcer_.reset(new PolicyEnforcer(
+ foreman_client_id_,
+ num_numa_nodes,
+ catalog_database_,
+ storage_manager_,
+ worker_directory_,
+ bus_,
+ profile_individual_workorders));
+}
+
+void ForemanSingleNode::run() {
+ if (cpu_id_ >= 0) {
+ // We can pin the foreman thread to a CPU if specified.
+ ThreadUtil::BindToCPU(cpu_id_);
+ }
+
+ // Event loop
+ for (;;) {
+ // Receive() causes this thread to sleep until next message is received.
+ const AnnotatedMessage annotated_msg =
+ bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+ const tmb::message_type_id message_type = tagged_message.message_type();
+ switch (message_type) {
+ case kCatalogRelationNewBlockMessage: // Fall through
+ case kDataPipelineMessage:
+ case kRebuildWorkOrderCompleteMessage:
+ case kWorkOrderCompleteMessage:
+ case kWorkOrderFeedbackMessage:
+ case kWorkOrdersAvailableMessage: {
+ policy_enforcer_->processMessage(tagged_message);
+ break;
+ }
+
+ case kAdmitRequestMessage: {
+ const AdmitRequestMessage *msg =
+ static_cast<const AdmitRequestMessage *>(tagged_message.message());
+ const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
+
+ DCHECK(!query_handles.empty());
+ bool all_queries_admitted = true;
+ if (query_handles.size() == 1u) {
+ all_queries_admitted =
+ policy_enforcer_->admitQuery(query_handles.front());
+ } else {
+ all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+ }
+ if (!all_queries_admitted) {
+ LOG(WARNING) << "The scheduler could not admit all the queries";
+ // TODO(harshad) - Inform the main thread about the failure.
+ }
+ break;
+ }
+ case kPoisonMessage: {
+ if (policy_enforcer_->hasQueries()) {
+ LOG(WARNING) << "Foreman thread exiting while some queries are "
+ "under execution or waiting to be admitted";
+ }
+ return;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type to Foreman";
+ }
+
+ if (canCollectNewMessages(message_type)) {
+ vector<unique_ptr<WorkerMessage>> new_messages;
+ policy_enforcer_->getWorkerMessages(&new_messages);
+ dispatchWorkerMessages(new_messages);
+ }
+
+ // We check again, as some queries may produce zero work orders and finish
+ // their execution.
+ if (!policy_enforcer_->hasQueries()) {
+ // Signal the main thread that there are no queries to be executed.
+ // Currently the message doesn't have any real content.
+ const int dummy_payload = 0;
+ TaggedMessage completion_tagged_message(
+ &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(
+ bus_,
+ foreman_client_id_,
+ main_thread_client_id_,
+ move(completion_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID "
+ << foreman_client_id_ << " to main thread with TMB client ID"
+ << main_thread_client_id_;
+ }
+ }
+}
+
+bool ForemanSingleNode::canCollectNewMessages(const tmb::message_type_id message_type) {
+ if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+ kCatalogRelationNewBlockMessage,
+ kWorkOrderFeedbackMessage)) {
+ return false;
+ } else if (worker_directory_->getLeastLoadedWorker().second <=
+ FLAGS_min_load_per_worker) {
+ // If the least loaded worker has only one pending work order, we should
+ // collect new messages and dispatch them.
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void ForemanSingleNode::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
+ for (const auto &message : messages) {
+ DCHECK(message != nullptr);
+ const int recipient_worker_thread_index = message->getRecipientHint();
+ if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
+ sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
+ *message);
+ worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
+ } else {
+ const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
+ sendWorkerMessage(least_loaded_worker_thread_index, *message);
+ worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
+ }
+ }
+}
+
+void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
+ const WorkerMessage &message) {
+ tmb::message_type_id type;
+ if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) {
+ type = kRebuildWorkOrderMessage;
+ } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) {
+ type = kWorkOrderMessage;
+ } else {
+ FATAL_ERROR("Invalid WorkerMessageType");
+ }
+ TaggedMessage worker_tagged_message(&message, sizeof(message), type);
+
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ worker_directory_->getClientID(worker_thread_index),
+ move(worker_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
+ "Message could not be sent from Foreman with TMB client ID "
+ << foreman_client_id_ << " to Foreman with TMB client ID "
+ << worker_directory_->getClientID(worker_thread_index);
+}
+
+void ForemanSingleNode::printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const {
+ const std::vector<
+ std::tuple<std::size_t, std::size_t, std::size_t>>
+ &recorded_times = policy_enforcer_->getProfilingResults(query_id);
+ fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);
+ for (auto workorder_entry : recorded_times) {
+ // Note: Index of the "worker thread index" in the tuple is 0.
+ const std::size_t worker_id = std::get<0>(workorder_entry);
+ fprintf(out,
+ "%lu,%lu,%d,%lu,%lu\n",
+ query_id,
+ worker_id,
+ worker_directory_->getNUMANode(worker_id),
+ std::get<1>(workorder_entry), // Operator ID.
+ std::get<2>(workorder_entry)); // Time.
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
new file mode 100644
index 0000000..7506d35
--- /dev/null
+++ b/query_execution/ForemanSingleNode.hpp
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "query_execution/ForemanBase.hpp"
+#include "query_execution/PolicyEnforcer.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class StorageManager;
+class WorkerDirectory;
+class WorkerMessage;
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief The Foreman receives queries from the main thread, messages from the
+ * policy enforcer and dispatches the work to worker threads. It also
+ * receives work completion messages from workers.
+ **/
+class ForemanSingleNode final : public ForemanBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param main_thread_client_id The TMB client ID of the main thread.
+ * @param worker_directory The worker directory.
+ * @param bus A pointer to the TMB.
+ * @param catalog_database The catalog database where this query is executed.
+ * @param storage_manager The StorageManager to use.
+ * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+ * @param num_numa_nodes The number of NUMA nodes in the system.
+ * @param profile_individual_workorders Whether every workorder's execution
+ * be profiled or not.
+ *
+ * @note If cpu_id is not specified, Foreman thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ ForemanSingleNode(const tmb::client_id main_thread_client_id,
+ WorkerDirectory *worker_directory,
+ tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ const int cpu_id = -1,
+ const std::size_t num_numa_nodes = 1,
+ const bool profile_individual_workorders = false);
+
+ ~ForemanSingleNode() override {}
+
+ /**
+ * @brief Print the results of profiling individual work orders for a given
+ * query.
+ *
+ * TODO(harshad) - Add the name of the operator to the output.
+ * TODO(harshad) - Add the CPU core ID of the operator to the output. This
+ * will require modifying the WorkerDirectory to remember worker affinities.
+ * Until then, the users can refer to the worker_affinities provided to the
+ * cli to infer the CPU core ID where a given worker is pinned.
+ *
+ * @param query_id The ID of the query for which the results are to be printed.
+ * @param out The file stream.
+ **/
+ void printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const;
+
+ protected:
+ void run() override;
+
+ private:
+ /**
+ * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the
+ * worker threads.
+ *
+ * @param messages The messages to be dispatched.
+ **/
+ void dispatchWorkerMessages(
+ const std::vector<std::unique_ptr<WorkerMessage>> &messages);
+
+ /**
+ * @brief Send the given message to the specified worker.
+ *
+ * @param worker_thread_index The logical index of the recipient worker thread
+ * in WorkerDirectory.
+ * @param message The WorkerMessage to be sent.
+ **/
+ void sendWorkerMessage(const std::size_t worker_thread_index,
+ const WorkerMessage &message);
+
+ /**
+ * @brief Check if we can collect new messages from the PolicyEnforcer.
+ *
+ * @param message_type The type of the last received message.
+ **/
+ bool canCollectNewMessages(const tmb::message_type_id message_type);
+
+ const tmb::client_id main_thread_client_id_;
+
+ WorkerDirectory *worker_directory_;
+
+ CatalogDatabaseLite *catalog_database_;
+ StorageManager *storage_manager_;
+
+ std::unique_ptr<PolicyEnforcer> policy_enforcer_;
+
+ DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 5b58f75..9cad47f 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -117,7 +117,7 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_AdmitRequestMessage
- quickstep_queryexecution_Foreman
+ quickstep_queryexecution_ForemanSingleNode
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 8c1d306..563a777 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -25,7 +25,7 @@
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/Worker.hpp"
#include "query_optimizer/ExecutionGenerator.hpp"
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
index bb2a26f..d1d9380 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
@@ -25,7 +25,7 @@
#include <vector>
#include "parser/SqlParserWrapper.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/Worker.hpp"
#include "query_execution/WorkerDirectory.hpp"
@@ -80,11 +80,12 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
workers_.reset(new WorkerDirectory(1 /* number of workers */,
worker_client_ids, numa_nodes));
- foreman_.reset(new Foreman(main_thread_client_id_,
- workers_.get(),
- &bus_,
- test_database_loader_.catalog_database(),
- test_database_loader_.storage_manager()));
+ foreman_.reset(
+ new ForemanSingleNode(main_thread_client_id_,
+ workers_.get(),
+ &bus_,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager()));
foreman_->start();
worker_->start();
@@ -105,7 +106,7 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
TestDatabaseLoader test_database_loader_;
MessageBusImpl bus_;
- std::unique_ptr<Foreman> foreman_;
+ std::unique_ptr<ForemanSingleNode> foreman_;
std::unique_ptr<Worker> worker_;
std::unique_ptr<WorkerDirectory> workers_;