You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/29 23:44:13 UTC

[10/25] incubator-quickstep git commit: Renamed Foreman related classes.

Renamed Foreman related classes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4fb884c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4fb884c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4fb884c1

Branch: refs/heads/dist-exe-test-new
Commit: 4fb884c1bef53b00ec3e0362b8de401b7c0b07f6
Parents: a37bf26
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Jul 7 14:13:19 2016 -0500
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Fri Jul 29 16:42:10 2016 -0700

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +-
 cli/CMakeLists.txt                              |   1 -
 cli/CommandExecutor.cpp                         |   1 -
 cli/CommandExecutor.hpp                         |   1 -
 cli/QuickstepCli.cpp                            |  21 +-
 cli/tests/CMakeLists.txt                        |   2 +-
 cli/tests/CommandExecutorTestRunner.cpp         |   2 +-
 cli/tests/CommandExecutorTestRunner.hpp         |  15 +-
 query_execution/CMakeLists.txt                  |  28 +-
 query_execution/Foreman.cpp                     | 255 ------------------
 query_execution/Foreman.hpp                     | 140 ----------
 query_execution/ForemanBase.hpp                 |  85 ++++++
 query_execution/ForemanLite.hpp                 |  85 ------
 query_execution/ForemanSingleNode.cpp           | 256 +++++++++++++++++++
 query_execution/ForemanSingleNode.hpp           | 140 ++++++++++
 query_optimizer/tests/CMakeLists.txt            |   2 +-
 .../tests/ExecutionGeneratorTestRunner.cpp      |   2 +-
 .../tests/ExecutionGeneratorTestRunner.hpp      |  15 +-
 18 files changed, 527 insertions(+), 526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index de6754a..042c050 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -756,7 +756,7 @@ target_link_libraries(quickstep_cli_shell
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
                       quickstep_queryexecution_AdmitRequestMessage
-                      quickstep_queryexecution_Foreman
+                      quickstep_queryexecution_ForemanSingleNode
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 44ec223..9637055 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -89,7 +89,6 @@ target_link_libraries(quickstep_cli_CommandExecutor
                       quickstep_cli_PrintToScreen
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
-                      quickstep_queryexecution_Foreman
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_queryoptimizer_QueryPlan
                       quickstep_queryoptimizer_QueryProcessor

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 7083ef5..8acfae8 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -34,7 +34,6 @@
 #include "parser/ParseStatement.hpp"
 #include "parser/ParseString.hpp"
 #include "parser/SqlParserWrapper.hpp"
-#include "query_execution/Foreman.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/QueryProcessor.hpp"

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/CommandExecutor.hpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.hpp b/cli/CommandExecutor.hpp
index 3435aeb..19d03e6 100644
--- a/cli/CommandExecutor.hpp
+++ b/cli/CommandExecutor.hpp
@@ -32,7 +32,6 @@ namespace tmb { class MessageBus; }
 namespace quickstep {
 
 class CatalogDatabase;
-class Foreman;
 class ParseStatement;
 class QueryProcessor;
 class StorageManager;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 02a55a0..68a3599 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -58,7 +58,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 #include "parser/ParseStatement.hpp"
 #include "parser/SqlParserWrapper.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/Worker.hpp"
@@ -104,7 +104,7 @@ using quickstep::AdmitRequestMessage;
 using quickstep::CatalogRelation;
 using quickstep::DefaultsConfigurator;
 using quickstep::DropRelation;
-using quickstep::Foreman;
+using quickstep::ForemanSingleNode;
 using quickstep::InputParserUtil;
 using quickstep::MessageBusImpl;
 using quickstep::MessageStyle;
@@ -353,14 +353,15 @@ int main(int argc, char* argv[]) {
                                    worker_client_ids,
                                    worker_numa_nodes);
 
-  Foreman foreman(main_thread_client_id,
-                  &worker_directory,
-                  &bus,
-                  query_processor->getDefaultDatabase(),
-                  query_processor->getStorageManager(),
-                  -1,  // Don't pin the Foreman thread.
-                  num_numa_nodes_system,
-                  quickstep::FLAGS_profile_and_report_workorder_perf);
+  ForemanSingleNode foreman(
+      main_thread_client_id,
+      &worker_directory,
+      &bus,
+      query_processor->getDefaultDatabase(),
+      query_processor->getStorageManager(),
+      -1,  // Don't pin the Foreman thread.
+      num_numa_nodes_system,
+      quickstep::FLAGS_profile_and_report_workorder_perf);
 
   // Start the worker threads.
   for (Worker &worker : workers) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt
index d177d6c..7da56d1 100644
--- a/cli/tests/CMakeLists.txt
+++ b/cli/tests/CMakeLists.txt
@@ -33,7 +33,7 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
                       quickstep_queryexecution_AdmitRequestMessage
-                      quickstep_queryexecution_Foreman
+                      quickstep_queryexecution_ForemanSingleNode
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp
index 794f7e1..bd7082f 100644
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@ -27,7 +27,7 @@
 #include "cli/PrintToScreen.hpp"
 #include "parser/ParseStatement.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_optimizer/ExecutionGenerator.hpp"

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/tests/CommandExecutorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.hpp b/cli/tests/CommandExecutorTestRunner.hpp
index 8fb5b65..69692ae 100644
--- a/cli/tests/CommandExecutorTestRunner.hpp
+++ b/cli/tests/CommandExecutorTestRunner.hpp
@@ -25,7 +25,7 @@
 #include <vector>
 
 #include "parser/SqlParserWrapper.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/Worker.hpp"
@@ -77,11 +77,12 @@ class CommandExecutorTestRunner : public TextBasedTestRunner {
 
     workers_.reset(new WorkerDirectory(1 /* number of workers */,
                                        worker_client_ids, numa_nodes));
-    foreman_.reset(new Foreman(main_thread_client_id_,
-                               workers_.get(),
-                               &bus_,
-                               test_database_loader_.catalog_database(),
-                               test_database_loader_.storage_manager()));
+    foreman_.reset(
+        new ForemanSingleNode(main_thread_client_id_,
+                              workers_.get(),
+                              &bus_,
+                              test_database_loader_.catalog_database(),
+                              test_database_loader_.storage_manager()));
 
     foreman_->start();
     worker_->start();
@@ -104,7 +105,7 @@ class CommandExecutorTestRunner : public TextBasedTestRunner {
   tmb::client_id main_thread_client_id_;
 
   MessageBusImpl bus_;
-  std::unique_ptr<Foreman> foreman_;
+  std::unique_ptr<ForemanSingleNode> foreman_;
   std::unique_ptr<Worker> worker_;
 
   std::unique_ptr<WorkerDirectory> workers_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b031a44..2be451c 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -1,6 +1,6 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015-2016 Pivotal Software, Inc.
-#   Copyright 2016, Quickstep Research Group, Computer Sciences Department, 
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
 #     University of Wisconsin\u2014Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
@@ -32,8 +32,8 @@ if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
 endif()
 add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
-add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
-add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
+add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
@@ -69,11 +69,15 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_utility_Macros
                         tmb)
 endif()
-target_link_libraries(quickstep_queryexecution_Foreman
-                      ${GFLAGS_LIB_NAME} 
+target_link_libraries(quickstep_queryexecution_ForemanBase
+                      glog
+                      quickstep_threading_Thread
+                      quickstep_utility_Macros
+                      tmb)
+target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
-                      quickstep_queryexecution_ForemanLite
+                      quickstep_queryexecution_ForemanBase
                       quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil
@@ -82,12 +86,8 @@ target_link_libraries(quickstep_queryexecution_Foreman
                       quickstep_threading_ThreadUtil
                       quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Macros
-                      tmb)
-target_link_libraries(quickstep_queryexecution_ForemanLite
-                      glog
-                      quickstep_threading_Thread
-                      quickstep_utility_Macros
-                      tmb)
+                      tmb
+                      ${GFLAGS_LIB_NAME})
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       ${GFLAGS_LIB_NAME}
                       glog
@@ -199,8 +199,8 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
 add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp)
 target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_AdmitRequestMessage
-                      quickstep_queryexecution_Foreman
-                      quickstep_queryexecution_ForemanLite
+                      quickstep_queryexecution_ForemanBase
+                      quickstep_queryexecution_ForemanSingleNode
                       quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
deleted file mode 100644
index 98146e2..0000000
--- a/query_execution/Foreman.cpp
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015-2016 Pivotal Software, Inc.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#include "query_execution/Foreman.hpp"
-
-#include <cstddef>
-#include <cstdio>
-#include <memory>
-#include <tuple>
-#include <utility>
-#include <vector>
-
-#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "threading/ThreadUtil.hpp"
-#include "utility/EqualsAnyConstant.hpp"
-#include "utility/Macros.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::size_t;
-using std::unique_ptr;
-using std::vector;
-
-namespace quickstep {
-
-DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
-              "of pending work orders for the worker. This information is used "
-              "by the Foreman to assign work orders to worker threads");
-
-Foreman::Foreman(const tmb::client_id main_thread_client_id,
-                 WorkerDirectory *worker_directory,
-                 tmb::MessageBus *bus,
-                 CatalogDatabaseLite *catalog_database,
-                 StorageManager *storage_manager,
-                 const int cpu_id,
-                 const size_t num_numa_nodes,
-                 const bool profile_individual_workorders)
-    : ForemanLite(bus, cpu_id),
-      main_thread_client_id_(main_thread_client_id),
-      worker_directory_(DCHECK_NOTNULL(worker_directory)),
-      catalog_database_(DCHECK_NOTNULL(catalog_database)),
-      storage_manager_(DCHECK_NOTNULL(storage_manager)) {
-  const std::vector<QueryExecutionMessageType> sender_message_types{
-      kPoisonMessage,
-      kRebuildWorkOrderMessage,
-      kWorkOrderMessage,
-      kWorkloadCompletionMessage};
-
-  for (const auto message_type : sender_message_types) {
-    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
-  }
-
-  const std::vector<QueryExecutionMessageType> receiver_message_types{
-      kAdmitRequestMessage,
-      kCatalogRelationNewBlockMessage,
-      kDataPipelineMessage,
-      kPoisonMessage,
-      kRebuildWorkOrderCompleteMessage,
-      kWorkOrderFeedbackMessage,
-      kWorkOrdersAvailableMessage,
-      kWorkOrderCompleteMessage};
-
-  for (const auto message_type : receiver_message_types) {
-    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
-  }
-
-  policy_enforcer_.reset(new PolicyEnforcer(
-      foreman_client_id_,
-      num_numa_nodes,
-      catalog_database_,
-      storage_manager_,
-      worker_directory_,
-      bus_,
-      profile_individual_workorders));
-}
-
-void Foreman::run() {
-  if (cpu_id_ >= 0) {
-    // We can pin the foreman thread to a CPU if specified.
-    ThreadUtil::BindToCPU(cpu_id_);
-  }
-
-  // Event loop
-  for (;;) {
-    // Receive() causes this thread to sleep until next message is received.
-    const AnnotatedMessage annotated_msg =
-        bus_->Receive(foreman_client_id_, 0, true);
-    const TaggedMessage &tagged_message = annotated_msg.tagged_message;
-    const tmb::message_type_id message_type = tagged_message.message_type();
-    switch (message_type) {
-      case kCatalogRelationNewBlockMessage:  // Fall through
-      case kDataPipelineMessage:
-      case kRebuildWorkOrderCompleteMessage:
-      case kWorkOrderCompleteMessage:
-      case kWorkOrderFeedbackMessage:
-      case kWorkOrdersAvailableMessage: {
-        policy_enforcer_->processMessage(tagged_message);
-        break;
-      }
-
-      case kAdmitRequestMessage: {
-        const AdmitRequestMessage *msg =
-            static_cast<const AdmitRequestMessage *>(tagged_message.message());
-        const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
-
-        DCHECK(!query_handles.empty());
-        bool all_queries_admitted = true;
-        if (query_handles.size() == 1u) {
-          all_queries_admitted =
-              policy_enforcer_->admitQuery(query_handles.front());
-        } else {
-          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
-        }
-        if (!all_queries_admitted) {
-          LOG(WARNING) << "The scheduler could not admit all the queries";
-          // TODO(harshad) - Inform the main thread about the failure.
-        }
-        break;
-      }
-      case kPoisonMessage: {
-        if (policy_enforcer_->hasQueries()) {
-          LOG(WARNING) << "Foreman thread exiting while some queries are "
-                          "under execution or waiting to be admitted";
-        }
-        return;
-      }
-      default:
-        LOG(FATAL) << "Unknown message type to Foreman";
-    }
-
-    if (canCollectNewMessages(message_type)) {
-      vector<unique_ptr<WorkerMessage>> new_messages;
-      policy_enforcer_->getWorkerMessages(&new_messages);
-      dispatchWorkerMessages(new_messages);
-    }
-
-    // We check again, as some queries may produce zero work orders and finish
-    // their execution.
-    if (!policy_enforcer_->hasQueries()) {
-      // Signal the main thread that there are no queries to be executed.
-      // Currently the message doesn't have any real content.
-      const int dummy_payload = 0;
-      TaggedMessage completion_tagged_message(
-          &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
-      const tmb::MessageBus::SendStatus send_status =
-          QueryExecutionUtil::SendTMBMessage(
-              bus_,
-              foreman_client_id_,
-              main_thread_client_id_,
-              move(completion_tagged_message));
-      CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-          << "Message could not be sent from Foreman with TMB client ID "
-          << foreman_client_id_ << " to main thread with TMB client ID"
-          << main_thread_client_id_;
-    }
-  }
-}
-
-bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) {
-  if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
-                                    kCatalogRelationNewBlockMessage,
-                                    kWorkOrderFeedbackMessage)) {
-    return false;
-  } else if (worker_directory_->getLeastLoadedWorker().second <=
-             FLAGS_min_load_per_worker) {
-    // If the least loaded worker has only one pending work order, we should
-    // collect new messages and dispatch them.
-    return true;
-  } else {
-    return false;
-  }
-}
-
-void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
-  for (const auto &message : messages) {
-    DCHECK(message != nullptr);
-    const int recipient_worker_thread_index = message->getRecipientHint();
-    if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
-      sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
-                        *message);
-      worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
-    } else {
-      const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
-      sendWorkerMessage(least_loaded_worker_thread_index, *message);
-      worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
-    }
-  }
-}
-
-void Foreman::sendWorkerMessage(const size_t worker_thread_index,
-                                const WorkerMessage &message) {
-  tmb::message_type_id type;
-  if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) {
-    type = kRebuildWorkOrderMessage;
-  } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) {
-    type = kWorkOrderMessage;
-  } else {
-    FATAL_ERROR("Invalid WorkerMessageType");
-  }
-  TaggedMessage worker_tagged_message(&message, sizeof(message), type);
-
-  const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         worker_directory_->getClientID(worker_thread_index),
-                                         move(worker_tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-      "Message could not be sent from Foreman with TMB client ID "
-      << foreman_client_id_ << " to Foreman with TMB client ID "
-      << worker_directory_->getClientID(worker_thread_index);
-}
-
-void Foreman::printWorkOrderProfilingResults(const std::size_t query_id,
-                                             std::FILE *out) const {
-  const std::vector<
-      std::tuple<std::size_t, std::size_t, std::size_t>>
-      &recorded_times = policy_enforcer_->getProfilingResults(query_id);
-  fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);
-  for (auto workorder_entry : recorded_times) {
-    // Note: Index of the "worker thread index" in the tuple is 0.
-    const std::size_t worker_id = std::get<0>(workorder_entry);
-    fprintf(out,
-            "%lu,%lu,%d,%lu,%lu\n",
-            query_id,
-            worker_id,
-            worker_directory_->getNUMANode(worker_id),
-            std::get<1>(workorder_entry),  // Operator ID.
-            std::get<2>(workorder_entry));  // Time.
-  }
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
deleted file mode 100644
index 7be57e7..0000000
--- a/query_execution/Foreman.hpp
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015-2016 Pivotal Software, Inc.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
-
-#include <cstddef>
-#include <cstdio>
-#include <memory>
-#include <vector>
-
-#include "query_execution/ForemanLite.hpp"
-#include "query_execution/PolicyEnforcer.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class StorageManager;
-class WorkerDirectory;
-class WorkerMessage;
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief The Foreman receives queries from the main thread, messages from the
- *        policy enforcer and dispatches the work to worker threads. It also
- *        receives work completion messages from workers.
- **/
-class Foreman final : public ForemanLite {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param main_thread_client_id The TMB client ID of the main thread.
-   * @param worker_directory The worker directory.
-   * @param bus A pointer to the TMB.
-   * @param catalog_database The catalog database where this query is executed.
-   * @param storage_manager The StorageManager to use.
-   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
-   * @param num_numa_nodes The number of NUMA nodes in the system.
-   * @param profile_individual_workorders Whether every workorder's execution
-   *        be profiled or not.
-   *
-   * @note If cpu_id is not specified, Foreman thread can be possibly moved
-   *       around on different CPUs by the OS.
-  **/
-  Foreman(const tmb::client_id main_thread_client_id,
-          WorkerDirectory *worker_directory,
-          tmb::MessageBus *bus,
-          CatalogDatabaseLite *catalog_database,
-          StorageManager *storage_manager,
-          const int cpu_id = -1,
-          const std::size_t num_numa_nodes = 1,
-          const bool profile_individual_workorders = false);
-
-  ~Foreman() override {}
-
-  /**
-   * @brief Print the results of profiling individual work orders for a given
-   *        query.
-   *
-   * TODO(harshad) - Add the name of the operator to the output.
-   * TODO(harshad) - Add the CPU core ID of the operator to the output. This
-   * will require modifying the WorkerDirectory to remember worker affinities.
-   * Until then, the users can refer to the worker_affinities provided to the
-   * cli to infer the CPU core ID where a given worker is pinned.
-   *
-   * @param query_id The ID of the query for which the results are to be printed.
-   * @param out The file stream.
-   **/
-  void printWorkOrderProfilingResults(const std::size_t query_id,
-                                      std::FILE *out) const;
-
- protected:
-  void run() override;
-
- private:
-  /**
-   * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the
-   *        worker threads.
-   *
-   * @param messages The messages to be dispatched.
-   **/
-  void dispatchWorkerMessages(
-      const std::vector<std::unique_ptr<WorkerMessage>> &messages);
-
-  /**
-   * @brief Send the given message to the specified worker.
-   *
-   * @param worker_thread_index The logical index of the recipient worker thread
-   *        in WorkerDirectory.
-   * @param message The WorkerMessage to be sent.
-   **/
-  void sendWorkerMessage(const std::size_t worker_thread_index,
-                         const WorkerMessage &message);
-
-  /**
-   * @brief Check if we can collect new messages from the PolicyEnforcer.
-   *
-   * @param message_type The type of the last received message.
-   **/
-  bool canCollectNewMessages(const tmb::message_type_id message_type);
-
-  const tmb::client_id main_thread_client_id_;
-
-  WorkerDirectory *worker_directory_;
-
-  CatalogDatabaseLite *catalog_database_;
-  StorageManager *storage_manager_;
-
-  std::unique_ptr<PolicyEnforcer> policy_enforcer_;
-
-  DISALLOW_COPY_AND_ASSIGN(Foreman);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanBase.hpp b/query_execution/ForemanBase.hpp
new file mode 100644
index 0000000..274b8fc
--- /dev/null
+++ b/query_execution/ForemanBase.hpp
@@ -0,0 +1,85 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
+
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A base class that Foreman implements. This class is used to derive
+ *        for implementations for both the single-node and distributed versions.
+ **/
+class ForemanBase : public Thread {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param bus A pointer to the TMB.
+   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+   *
+   * @note If cpu_id is not specified, Foreman thread can be possibly moved
+   *       around on different CPUs by the OS.
+  **/
+  ForemanBase(tmb::MessageBus *bus,
+              const int cpu_id)
+      : bus_(DCHECK_NOTNULL(bus)),
+        cpu_id_(cpu_id) {
+    foreman_client_id_ = bus_->Connect();
+  }
+
+  ~ForemanBase() override {}
+
+  /**
+   * @brief Get the TMB client ID of Foreman thread.
+   *
+   * @return TMB client ID of foreman thread.
+   **/
+  tmb::client_id getBusClientID() const {
+    return foreman_client_id_;
+  }
+
+ protected:
+  void run() override = 0;
+
+  tmb::MessageBus *bus_;
+
+  tmb::client_id foreman_client_id_;
+
+  // The ID of the CPU that the Foreman thread can optionally be pinned to.
+  const int cpu_id_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ForemanBase);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanLite.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanLite.hpp b/query_execution/ForemanLite.hpp
deleted file mode 100644
index cb6cdf3..0000000
--- a/query_execution/ForemanLite.hpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- *   Copyright 2016 Pivotal Software, Inc.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_
-
-#include "threading/Thread.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A base class that Foreman implements. This class is used to derive
- *        for implementations for both the single-node and distributed versions.
- **/
-class ForemanLite : public Thread {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param bus A pointer to the TMB.
-   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
-   *
-   * @note If cpu_id is not specified, Foreman thread can be possibly moved
-   *       around on different CPUs by the OS.
-  **/
-  ForemanLite(tmb::MessageBus *bus,
-              const int cpu_id)
-      : bus_(DCHECK_NOTNULL(bus)),
-        cpu_id_(cpu_id) {
-    foreman_client_id_ = bus_->Connect();
-  }
-
-  ~ForemanLite() override {}
-
-  /**
-   * @brief Get the TMB client ID of Foreman thread.
-   *
-   * @return TMB client ID of foreman thread.
-   **/
-  tmb::client_id getBusClientID() const {
-    return foreman_client_id_;
-  }
-
- protected:
-  void run() override = 0;
-
-  tmb::MessageBus *bus_;
-
-  tmb::client_id foreman_client_id_;
-
-  // The ID of the CPU that the Foreman thread can optionally be pinned to.
-  const int cpu_id_;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(ForemanLite);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
new file mode 100644
index 0000000..3aa1f0b
--- /dev/null
+++ b/query_execution/ForemanSingleNode.cpp
@@ -0,0 +1,256 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/ForemanSingleNode.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+namespace quickstep {
+
+DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
+              "of pending work orders for the worker. This information is used "
+              "by the Foreman to assign work orders to worker threads");
+
+ForemanSingleNode::ForemanSingleNode(
+    const tmb::client_id main_thread_client_id,
+    WorkerDirectory *worker_directory,
+    tmb::MessageBus *bus,
+    CatalogDatabaseLite *catalog_database,
+    StorageManager *storage_manager,
+    const int cpu_id,
+    const size_t num_numa_nodes,
+    const bool profile_individual_workorders)
+    : ForemanBase(bus, cpu_id),
+      main_thread_client_id_(main_thread_client_id),
+      worker_directory_(DCHECK_NOTNULL(worker_directory)),
+      catalog_database_(DCHECK_NOTNULL(catalog_database)),
+      storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+  const std::vector<QueryExecutionMessageType> sender_message_types{
+      kPoisonMessage,
+      kRebuildWorkOrderMessage,
+      kWorkOrderMessage,
+      kWorkloadCompletionMessage};
+
+  for (const auto message_type : sender_message_types) {
+    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+  }
+
+  const std::vector<QueryExecutionMessageType> receiver_message_types{
+      kAdmitRequestMessage,
+      kCatalogRelationNewBlockMessage,
+      kDataPipelineMessage,
+      kPoisonMessage,
+      kRebuildWorkOrderCompleteMessage,
+      kWorkOrderFeedbackMessage,
+      kWorkOrdersAvailableMessage,
+      kWorkOrderCompleteMessage};
+
+  for (const auto message_type : receiver_message_types) {
+    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+  }
+
+  policy_enforcer_.reset(new PolicyEnforcer(
+      foreman_client_id_,
+      num_numa_nodes,
+      catalog_database_,
+      storage_manager_,
+      worker_directory_,
+      bus_,
+      profile_individual_workorders));
+}
+
+void ForemanSingleNode::run() {
+  if (cpu_id_ >= 0) {
+    // We can pin the foreman thread to a CPU if specified.
+    ThreadUtil::BindToCPU(cpu_id_);
+  }
+
+  // Event loop
+  for (;;) {
+    // Receive() causes this thread to sleep until next message is received.
+    const AnnotatedMessage annotated_msg =
+        bus_->Receive(foreman_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+    const tmb::message_type_id message_type = tagged_message.message_type();
+    switch (message_type) {
+      case kCatalogRelationNewBlockMessage:  // Fall through
+      case kDataPipelineMessage:
+      case kRebuildWorkOrderCompleteMessage:
+      case kWorkOrderCompleteMessage:
+      case kWorkOrderFeedbackMessage:
+      case kWorkOrdersAvailableMessage: {
+        policy_enforcer_->processMessage(tagged_message);
+        break;
+      }
+
+      case kAdmitRequestMessage: {
+        const AdmitRequestMessage *msg =
+            static_cast<const AdmitRequestMessage *>(tagged_message.message());
+        const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
+
+        DCHECK(!query_handles.empty());
+        bool all_queries_admitted = true;
+        if (query_handles.size() == 1u) {
+          all_queries_admitted =
+              policy_enforcer_->admitQuery(query_handles.front());
+        } else {
+          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+        }
+        if (!all_queries_admitted) {
+          LOG(WARNING) << "The scheduler could not admit all the queries";
+          // TODO(harshad) - Inform the main thread about the failure.
+        }
+        break;
+      }
+      case kPoisonMessage: {
+        if (policy_enforcer_->hasQueries()) {
+          LOG(WARNING) << "Foreman thread exiting while some queries are "
+                          "under execution or waiting to be admitted";
+        }
+        return;
+      }
+      default:
+        LOG(FATAL) << "Unknown message type to Foreman";
+    }
+
+    if (canCollectNewMessages(message_type)) {
+      vector<unique_ptr<WorkerMessage>> new_messages;
+      policy_enforcer_->getWorkerMessages(&new_messages);
+      dispatchWorkerMessages(new_messages);
+    }
+
+    // We check again, as some queries may produce zero work orders and finish
+    // their execution.
+    if (!policy_enforcer_->hasQueries()) {
+      // Signal the main thread that there are no queries to be executed.
+      // Currently the message doesn't have any real content.
+      const int dummy_payload = 0;
+      TaggedMessage completion_tagged_message(
+          &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
+      const tmb::MessageBus::SendStatus send_status =
+          QueryExecutionUtil::SendTMBMessage(
+              bus_,
+              foreman_client_id_,
+              main_thread_client_id_,
+              move(completion_tagged_message));
+      CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+          << "Message could not be sent from Foreman with TMB client ID "
+          << foreman_client_id_ << " to main thread with TMB client ID"
+          << main_thread_client_id_;
+    }
+  }
+}
+
+bool ForemanSingleNode::canCollectNewMessages(const tmb::message_type_id message_type) {
+  if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+                                    kCatalogRelationNewBlockMessage,
+                                    kWorkOrderFeedbackMessage)) {
+    return false;
+  } else if (worker_directory_->getLeastLoadedWorker().second <=
+             FLAGS_min_load_per_worker) {
+    // If the least loaded worker has only one pending work order, we should
+    // collect new messages and dispatch them.
+    return true;
+  } else {
+    return false;
+  }
+}
+
+void ForemanSingleNode::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
+  for (const auto &message : messages) {
+    DCHECK(message != nullptr);
+    const int recipient_worker_thread_index = message->getRecipientHint();
+    if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
+      sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
+                        *message);
+      worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
+    } else {
+      const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
+      sendWorkerMessage(least_loaded_worker_thread_index, *message);
+      worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
+    }
+  }
+}
+
+void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
+                                          const WorkerMessage &message) {
+  tmb::message_type_id type;
+  if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) {
+    type = kRebuildWorkOrderMessage;
+  } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) {
+    type = kWorkOrderMessage;
+  } else {
+    FATAL_ERROR("Invalid WorkerMessageType");
+  }
+  TaggedMessage worker_tagged_message(&message, sizeof(message), type);
+
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         worker_directory_->getClientID(worker_thread_index),
+                                         move(worker_tagged_message));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
+      "Message could not be sent from Foreman with TMB client ID "
+      << foreman_client_id_ << " to Foreman with TMB client ID "
+      << worker_directory_->getClientID(worker_thread_index);
+}
+
+void ForemanSingleNode::printWorkOrderProfilingResults(const std::size_t query_id,
+                                                       std::FILE *out) const {
+  const std::vector<
+      std::tuple<std::size_t, std::size_t, std::size_t>>
+      &recorded_times = policy_enforcer_->getProfilingResults(query_id);
+  fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);
+  for (auto workorder_entry : recorded_times) {
+    // Note: Index of the "worker thread index" in the tuple is 0.
+    const std::size_t worker_id = std::get<0>(workorder_entry);
+    fprintf(out,
+            "%lu,%lu,%d,%lu,%lu\n",
+            query_id,
+            worker_id,
+            worker_directory_->getNUMANode(worker_id),
+            std::get<1>(workorder_entry),  // Operator ID.
+            std::get<2>(workorder_entry));  // Time.
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
new file mode 100644
index 0000000..7506d35
--- /dev/null
+++ b/query_execution/ForemanSingleNode.hpp
@@ -0,0 +1,140 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "query_execution/ForemanBase.hpp"
+#include "query_execution/PolicyEnforcer.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class StorageManager;
+class WorkerDirectory;
+class WorkerMessage;
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief The Foreman receives queries from the main thread, messages from the
+ *        policy enforcer and dispatches the work to worker threads. It also
+ *        receives work completion messages from workers.
+ **/
+class ForemanSingleNode final : public ForemanBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param main_thread_client_id The TMB client ID of the main thread.
+   * @param worker_directory The worker directory.
+   * @param bus A pointer to the TMB.
+   * @param catalog_database The catalog database where this query is executed.
+   * @param storage_manager The StorageManager to use.
+   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+   * @param num_numa_nodes The number of NUMA nodes in the system.
+   * @param profile_individual_workorders Whether every workorder's execution
+   *        be profiled or not.
+   *
+   * @note If cpu_id is not specified, Foreman thread can be possibly moved
+   *       around on different CPUs by the OS.
+  **/
+  ForemanSingleNode(const tmb::client_id main_thread_client_id,
+          WorkerDirectory *worker_directory,
+          tmb::MessageBus *bus,
+          CatalogDatabaseLite *catalog_database,
+          StorageManager *storage_manager,
+          const int cpu_id = -1,
+          const std::size_t num_numa_nodes = 1,
+          const bool profile_individual_workorders = false);
+
+  ~ForemanSingleNode() override {}
+
+  /**
+   * @brief Print the results of profiling individual work orders for a given
+   *        query.
+   *
+   * TODO(harshad) - Add the name of the operator to the output.
+   * TODO(harshad) - Add the CPU core ID of the operator to the output. This
+   * will require modifying the WorkerDirectory to remember worker affinities.
+   * Until then, the users can refer to the worker_affinities provided to the
+   * cli to infer the CPU core ID where a given worker is pinned.
+   *
+   * @param query_id The ID of the query for which the results are to be printed.
+   * @param out The file stream.
+   **/
+  void printWorkOrderProfilingResults(const std::size_t query_id,
+                                      std::FILE *out) const;
+
+ protected:
+  void run() override;
+
+ private:
+  /**
+   * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the
+   *        worker threads.
+   *
+   * @param messages The messages to be dispatched.
+   **/
+  void dispatchWorkerMessages(
+      const std::vector<std::unique_ptr<WorkerMessage>> &messages);
+
+  /**
+   * @brief Send the given message to the specified worker.
+   *
+   * @param worker_thread_index The logical index of the recipient worker thread
+   *        in WorkerDirectory.
+   * @param message The WorkerMessage to be sent.
+   **/
+  void sendWorkerMessage(const std::size_t worker_thread_index,
+                         const WorkerMessage &message);
+
+  /**
+   * @brief Check if we can collect new messages from the PolicyEnforcer.
+   *
+   * @param message_type The type of the last received message.
+   **/
+  bool canCollectNewMessages(const tmb::message_type_id message_type);
+
+  const tmb::client_id main_thread_client_id_;
+
+  WorkerDirectory *worker_directory_;
+
+  CatalogDatabaseLite *catalog_database_;
+  StorageManager *storage_manager_;
+
+  std::unique_ptr<PolicyEnforcer> policy_enforcer_;
+
+  DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 5b58f75..9cad47f 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -117,7 +117,7 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
                       quickstep_queryexecution_AdmitRequestMessage
-                      quickstep_queryexecution_Foreman
+                      quickstep_queryexecution_ForemanSingleNode
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 8c1d306..563a777 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -25,7 +25,7 @@
 #include "cli/PrintToScreen.hpp"
 #include "parser/ParseStatement.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_optimizer/ExecutionGenerator.hpp"

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
index bb2a26f..d1d9380 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
@@ -25,7 +25,7 @@
 #include <vector>
 
 #include "parser/SqlParserWrapper.hpp"
-#include "query_execution/Foreman.hpp"
+#include "query_execution/ForemanSingleNode.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_execution/WorkerDirectory.hpp"
@@ -80,11 +80,12 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
     workers_.reset(new WorkerDirectory(1 /* number of workers */,
                                        worker_client_ids, numa_nodes));
-    foreman_.reset(new Foreman(main_thread_client_id_,
-                               workers_.get(),
-                               &bus_,
-                               test_database_loader_.catalog_database(),
-                               test_database_loader_.storage_manager()));
+    foreman_.reset(
+        new ForemanSingleNode(main_thread_client_id_,
+                              workers_.get(),
+                              &bus_,
+                              test_database_loader_.catalog_database(),
+                              test_database_loader_.storage_manager()));
 
     foreman_->start();
     worker_->start();
@@ -105,7 +106,7 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
   TestDatabaseLoader test_database_loader_;
 
   MessageBusImpl bus_;
-  std::unique_ptr<Foreman> foreman_;
+  std::unique_ptr<ForemanSingleNode> foreman_;
   std::unique_ptr<Worker> worker_;
 
   std::unique_ptr<WorkerDirectory> workers_;