You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/09 16:13:04 UTC

[1/4] incubator-quickstep git commit: Fix typos in CMakeLists.txt. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/query-manager-used-in-foreman d70ba5d1d -> a690455ec (forced update)


Fix typos in CMakeLists.txt.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/fd75e175
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/fd75e175
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/fd75e175

Branch: refs/heads/query-manager-used-in-foreman
Commit: fd75e17576118ef539d696e3663691ce12518a3b
Parents: 1be47dc
Author: Navneet Potti <na...@gmail.com>
Authored: Wed Jun 8 16:57:18 2016 -0500
Committer: Navneet Potti <na...@gmail.com>
Committed: Wed Jun 8 17:21:33 2016 -0500

----------------------------------------------------------------------
 expressions/CMakeLists.txt             | 2 +-
 expressions/aggregation/CMakeLists.txt | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd75e175/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/CMakeLists.txt b/expressions/CMakeLists.txt
index d8452b2..53ad5d4 100644
--- a/expressions/CMakeLists.txt
+++ b/expressions/CMakeLists.txt
@@ -63,6 +63,6 @@ add_library(quickstep_expressions ../empty_src.cpp ExpressionsModule.hpp)
 target_link_libraries(quickstep_expressions
                       quickstep_expressions_ExpressionFactories
                       quickstep_expressions_Expressions_proto
-                      quickstep_expressions_aggregate
+                      quickstep_expressions_aggregation
                       quickstep_expressions_predicate
                       quickstep_expressions_scalar)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd75e175/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 416c4c6..5744c52 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -266,7 +266,6 @@ target_link_libraries(quickstep_expressions_aggregation
                       quickstep_expressions_aggregation_AggregationHandleMax
                       quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_expressions_aggregation_AggregationHandleSum
-                      quickstep_expressions_aggregation_AggregationHandleUtil
                       quickstep_expressions_aggregation_AggregationID)
 
 # Tests:


[4/4] incubator-quickstep git commit: Created QueryManager class and tests.

Posted by hb...@apache.org.
Created QueryManager class and tests.

- Responsible for managing execution state, generation of work orders of
  the query and providing work orders for execution.
- Tests for QueryManager, which are simplified versions of the Foreman
  tests.

WorkerMessage enum type change in Foreman.

Replaced DCHECK_NOTNULL with DCHECK.

Query ID can be set for every WorkOrder.

Query ID can now be added to WorkOrders in WorkOrdersContainer.

Added query ID field in Work order completion message.

- Set before the worker sends the completion message.

QueryManager unittest fix.

Added query ID field to messages, query context and InsertDestination.

- WorkOrderCompletion and DataPipelineMessage have query ID field.
- InsertDestination proto and query context have query ID field.
- ExecutionGenerator sets the query ID in InsertDestination.

Added query ID to WorkOrdersAvailable message.

Added query ID to CatalogRelationNewBlockMessage.

Minor style fix in CMakeLists.

Created PolicyEnforcer class.

- First point of entry for queries in the scheduler.
- Can perform admission control.
- Can talk to the QueryManagers of the active queries to provide them
  messages to process and collect work orders for execution from them.

Support for admitting multiple queries to the PolicyEnforcer.

- Function to remove a query made public.

Added a new class for sending queries to the Foreman.

Minor error fix in PolicyEnforcer

Long lived Foreman thread

- Foreman thread lives through the lifetime of the Quickstep process.
- Foreman and main thread communicate through TMB messages.
- Foreman admits queries and routes them to PolicyEnforcer.
- Foreman relays messages to policy enforcer which in turn processes it,
  based on the query ID of the message.
- All the tests modified accordingly.

IWYU and lint fixes.

Code review changes.

Partition aware selection using NUMA-awareness (#175)

Link: https://github.com/pivotalsoftware/quickstep/pull/175

Fixes bug which disallows the creation of BITWEAVING_H indices. (#204)

Fixes a bug where numa settings were not properly set (not compiled in) (#205)

Change default aggregate_hashtable_type from LinearOpenAddressing to SeparateChaining (#207)

Revert "Change default aggregate_hashtable_type from LinearOpenAddressing to SeparateChaining" (#208)

Transaction Part 4: LockManager, CycleDetector and DeadLockDetector. (#187)

Change default aggregate_hashtable_type from LinearOpenAddressing to SeparateChaining  (#210)

Added support for the substring function. (#211)

Cleanup in Preloader class (#209)

Added print_query gflag (#213)

Refactor hasNUMAPlacementScheme. (#215)

Added a method for double parsing with significant number argument. (#216)

Leave 10% more memory for the OS. (#218)

Added a quick fix to the count(*) problem (#220)

Fixed a g++ compiler warning (#221)

Minor improvements in FileManagers. (#224)

Quickstep print catalog (#222)

Change the default value of Joined Tuple Collector (#226)

On NUMA boxes, the vector based joined tuple collector slows down queries.

Initial support for collecting table statistics: number of distinct values (#227)

Added hash join order optimization for star schema queries. (#229)

In ScopedBuffer, initialize the allocated memory bytes to 0 (#230)

Quickstep gen stats (#225)

Revert "Quickstep gen stats" (#231)

Reverting the PR that gets the number of rows (#233)

Resolves SMA bugs revealed by TPCH. (#234)

Revert "Explicitly specify where tcmalloc comes from" (#237)

Groupby hashtable pool (#236)

- Created a HashTablePool class for group by clause.
- Each thread can checkout it's own hash table while doing group by
  aggregation.
- AggregationOperationState uses one hash table pool per group by
  clause.

QUICKSTEP-1: Add incubation disclaimer to README.md.

This closes #2

Fixed Travis CI link in Readme.

Added BlockLocator.

Added Async DataExchange Service.

Bug fixed in isFull() function.

- Earlier check for isFull() was that whether there is at least one
empty bucket available for insertion.
- Now isFull() can check if there is enough space adding specified
number of buckets.
- To reproduce the bug behind this fix: Run TPC-H Q2 on SF100 dataset after
  running the \analyze command. The optimizer allocates smaller than
  necessary space for one of the hash joins. The resize() is triggered
  but never gets executed because of the restrictive isFull() condition.

Added query ID to relational operators and Aggregation op.

Added query ID to WorkOrder base class and Agg work order.

Added query ID to BuildHash operator and WorkOrders.

Checking if Travis Ubuntu uses pre-built clang-3.7 instead of downloading.

Added query ID to hash join operator.

Query ID added in CreateTable, Index, Delete and DestroyHash

Added query ID to all the remaining operators and work orders.

Doxygen and minor bug fixes.

Manually resetting the changes in the travis YAML.

Reordered query ID in aggregation operator

Reordered query ID in operators and work orders.

Reordered query ID in CreateIndex.

Reordered query ID in delete.

Reordered query ID in destroy hash.

Reordered query ID in Drop table.

Reordered query ID in Finalize aggregate

Reordered query ID in hash join

Reordered query ID in insert op

Reordered query ID in nested loops

Reordered query ID in rebuild

Reordered query ID in sample

Reordered query ID in save blocks

Reordered query ID in select

Reordered query ID in sort merge

Reordered query ID in sort run

Reordered query ID in table gen

Reorder query ID in text scan

Reordered query ID in update

Changes during the rebase with origin master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a690455e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a690455e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a690455e

Branch: refs/heads/query-manager-used-in-foreman
Commit: a690455ec963d670a3770a3238cafee3e66fafb7
Parents: fd75e17
Author: Harshad Deshmukh <ha...@cs.wisc.edu>
Authored: Sat Apr 9 15:08:40 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 9 11:09:38 2016 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +
 cli/CommandExecutor.cpp                         |  53 +-
 cli/CommandExecutor.hpp                         |  11 +-
 cli/QuickstepCli.cpp                            |  70 +--
 cli/tests/CMakeLists.txt                        |   2 +
 cli/tests/CommandExecutorTestRunner.cpp         |  29 +-
 cli/tests/CommandExecutorTestRunner.hpp         |  37 +-
 query_execution/AdmitRequestMessage.hpp         |  73 +++
 query_execution/CMakeLists.txt                  |  68 +--
 query_execution/Foreman.cpp                     | 578 +++++--------------
 query_execution/Foreman.hpp                     | 394 +------------
 query_execution/PolicyEnforcer.cpp              | 177 ++++++
 query_execution/PolicyEnforcer.hpp              | 168 ++++++
 query_execution/QueryContext.cpp                |   3 +-
 query_execution/QueryContext.hpp                |   9 +
 query_execution/QueryContext.proto              |   2 +
 query_execution/QueryExecutionMessages.proto    |   4 +
 query_execution/QueryExecutionTypedefs.hpp      |   4 +-
 query_execution/QueryExecutionUtil.hpp          |  52 ++
 query_execution/QueryManager.hpp                |   5 +-
 query_execution/WorkOrdersContainer.hpp         |  70 +--
 query_execution/Worker.cpp                      |  11 +-
 query_execution/Worker.hpp                      |   2 +
 query_execution/WorkerMessage.hpp               |  22 +-
 query_execution/tests/QueryManager_unittest.cpp |   6 +
 .../tests/WorkOrdersContainer_unittest.cpp      |  26 +
 query_optimizer/ExecutionGenerator.cpp          |  32 +-
 query_optimizer/ExecutionGenerator.hpp          |   8 +
 query_optimizer/tests/CMakeLists.txt            |   2 +
 .../tests/ExecutionGeneratorTestRunner.cpp      |  23 +-
 .../tests/ExecutionGeneratorTestRunner.hpp      |  33 +-
 query_optimizer/tests/TestDatabaseLoader.cpp    |   1 +
 relational_operators/AggregationOperator.hpp    |   2 +
 relational_operators/DeleteOperator.cpp         |   1 +
 relational_operators/DeleteOperator.hpp         |   1 +
 relational_operators/DestroyHashOperator.hpp    |   4 +
 relational_operators/DropTableOperator.hpp      |   2 +
 relational_operators/HashJoinOperator.hpp       |   1 +
 relational_operators/RebuildWorkOrder.hpp       |   1 +
 relational_operators/SortMergeRunOperator.cpp   |   1 +
 relational_operators/TableGeneratorOperator.hpp |   2 +
 relational_operators/TextScanOperator.cpp       |   3 +
 relational_operators/TextScanOperator.hpp       |   1 +
 relational_operators/UpdateOperator.cpp         |   1 +
 relational_operators/UpdateOperator.hpp         |   1 +
 relational_operators/WorkOrder.hpp              |  20 +-
 .../tests/AggregationOperator_unittest.cpp      |   7 +
 .../tests/HashJoinOperator_unittest.cpp         |  12 +
 .../tests/SortMergeRunOperator_unittest.cpp     |   6 +
 .../SortRunGenerationOperator_unittest.cpp      |   2 +
 .../tests/TextScanOperator_unittest.cpp         |   2 +
 storage/InsertDestination.cpp                   |  49 +-
 storage/InsertDestination.hpp                   |  58 +-
 storage/InsertDestination.proto                 |   1 +
 54 files changed, 1107 insertions(+), 1048 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ef7fd50..f674a78 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -711,9 +711,11 @@ target_link_libraries(quickstep_cli_shell
                       quickstep_cli_PrintToScreen
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
+                      quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_Worker
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index ddcd38f..dc14741 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -51,6 +51,8 @@
 
 #include "glog/logging.h"
 
+#include "tmb/id_typedefs.h"
+
 using std::fprintf;
 using std::fputc;
 using std::fputs;
@@ -58,6 +60,8 @@ using std::size_t;
 using std::string;
 using std::vector;
 
+namespace tmb { class MessageBus; }
+
 namespace quickstep {
 namespace cli {
 namespace {
@@ -194,11 +198,14 @@ void executeDescribeTable(
 /**
  * @brief A helper function that executes a SQL query to obtain a scalar result.
  */
-inline TypedValue executeQueryForSingleResult(const std::string &query_string,
-                                               StorageManager *storage_manager,
-                                               QueryProcessor *query_processor,
-                                               SqlParserWrapper *parser_wrapper,
-                                               Foreman *foreman) {
+inline TypedValue executeQueryForSingleResult(
+    const tmb::client_id main_thread_client_id,
+    const tmb::client_id foreman_client_id,
+    const std::string &query_string,
+    tmb::MessageBus *bus,
+    StorageManager *storage_manager,
+    QueryProcessor *query_processor,
+    SqlParserWrapper *parser_wrapper) {
   parser_wrapper->feedNextBuffer(new std::string(query_string));
 
   ParseResult result = parser_wrapper->getNextStatement();
@@ -210,11 +217,8 @@ inline TypedValue executeQueryForSingleResult(const std::string &query_string,
   DCHECK(query_handle->getQueryPlanMutable() != nullptr);
 
   // Use foreman to execute the query plan.
-  foreman->setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
-  foreman->reconstructQueryContextFromProto(query_handle->getQueryContextProto());
-
-  foreman->start();
-  foreman->join();
+  QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+      main_thread_client_id, foreman_client_id, query_handle.get(), bus);
 
   // Retrieve the scalar result from the result relation.
   const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
@@ -246,8 +250,10 @@ inline TypedValue executeQueryForSingleResult(const std::string &query_string,
   return value;
 }
 
-void executeAnalyze(QueryProcessor *query_processor,
-                    Foreman *foreman,
+void executeAnalyze(const tmb::client_id main_thread_client_id,
+                    const tmb::client_id foreman_client_id,
+                    MessageBus *bus,
+                    QueryProcessor *query_processor,
                     FILE *out) {
   const CatalogDatabase &database = *query_processor->getDefaultDatabase();
   StorageManager *storage_manager = query_processor->getStorageManager();
@@ -273,11 +279,13 @@ void executeAnalyze(QueryProcessor *query_processor,
       query_string.append(";");
 
       TypedValue num_distinct_values =
-          executeQueryForSingleResult(query_string,
+          executeQueryForSingleResult(main_thread_client_id,
+                                      foreman_client_id,
+                                      query_string,
+                                      bus,
                                       storage_manager,
                                       query_processor,
-                                      parser_wrapper.get(),
-                                      foreman);
+                                      parser_wrapper.get());
 
       DCHECK(num_distinct_values.getTypeID() == TypeID::kLong);
       mutable_relation->getStatisticsMutable()->setNumDistinctValues(
@@ -291,11 +299,13 @@ void executeAnalyze(QueryProcessor *query_processor,
     query_string.append(";");
 
     TypedValue num_tuples =
-        executeQueryForSingleResult(query_string,
+        executeQueryForSingleResult(main_thread_client_id,
+                                    foreman_client_id,
+                                    query_string,
+                                    bus,
                                     storage_manager,
                                     query_processor,
-                                    parser_wrapper.get(),
-                                    foreman);
+                                    parser_wrapper.get());
 
     DCHECK(num_tuples.getTypeID() == TypeID::kLong);
     mutable_relation->getStatisticsMutable()->setNumTuples(
@@ -312,9 +322,11 @@ void executeAnalyze(QueryProcessor *query_processor,
 
 void executeCommand(const ParseStatement &statement,
                     const CatalogDatabase &catalog_database,
+                    const tmb::client_id main_thread_client_id,
+                    const tmb::client_id foreman_client_id,
+                    MessageBus *bus,
                     StorageManager *storage_manager,
                     QueryProcessor *query_processor,
-                    Foreman *foreman,
                     FILE *out) {
   const ParseCommand &command = static_cast<const ParseCommand &>(statement);
   const PtrVector<ParseString> *arguments = command.arguments();
@@ -328,7 +340,8 @@ void executeCommand(const ParseStatement &statement,
       executeDescribeTable(arguments, catalog_database, out);
     }
   } else if (command_str == C::kAnalyzeCommand) {
-    executeAnalyze(query_processor, foreman, out);
+    executeAnalyze(
+        main_thread_client_id, foreman_client_id, bus, query_processor, out);
   } else {
     THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/cli/CommandExecutor.hpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.hpp b/cli/CommandExecutor.hpp
index c819981..3435aeb 100644
--- a/cli/CommandExecutor.hpp
+++ b/cli/CommandExecutor.hpp
@@ -21,10 +21,14 @@
 #include <cstdio>
 #include <string>
 
+#include "tmb/id_typedefs.h"
+
 using std::fprintf;
 using std::fputc;
 using std::string;
 
+namespace tmb { class MessageBus; }
+
 namespace quickstep {
 
 class CatalogDatabase;
@@ -53,6 +57,9 @@ constexpr char kAnalyzeCommand[] = "\\analyze";
   *
   * @param statement The parsed statement from the cli.
   * @param catalog_database The catalog information about the current database.
+  * @param main_thread_client_id The TMB client ID of the main thread.
+  * @param foreman_client_id The TMB client ID of the Foreman thread.
+  * @param bus A pointer to the TMB.
   * @param storage_manager The current StorageManager.
   * @param query_processor The query processor to generate plans for SQL queries.
   * @param foreman The foreman to execute query plans.
@@ -60,9 +67,11 @@ constexpr char kAnalyzeCommand[] = "\\analyze";
 */
 void executeCommand(const ParseStatement &statement,
                     const CatalogDatabase &catalog_database,
+                    const tmb::client_id main_thread_client_id,
+                    const tmb::client_id foreman_client_id,
+                    tmb::MessageBus *bus,
                     StorageManager *storage_manager,
                     QueryProcessor *query_processor,
-                    Foreman *foreman,
                     FILE *out);
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 558d6eb..3202c96 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -53,8 +53,10 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 #include "cli/PrintToScreen.hpp"
 #include "parser/ParseStatement.hpp"
 #include "parser/SqlParserWrapper.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/Foreman.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_execution/WorkerDirectory.hpp"
 #include "query_execution/WorkerMessage.hpp"
@@ -95,6 +97,7 @@ using std::string;
 using std::vector;
 
 using quickstep::Address;
+using quickstep::AdmitRequestMessage;
 using quickstep::CatalogRelation;
 using quickstep::DefaultsConfigurator;
 using quickstep::DropRelation;
@@ -107,6 +110,7 @@ using quickstep::ParseResult;
 using quickstep::ParseStatement;
 using quickstep::PrintToScreen;
 using quickstep::PtrVector;
+using quickstep::QueryExecutionUtil;
 using quickstep::QueryHandle;
 using quickstep::QueryPlan;
 using quickstep::QueryProcessor;
@@ -115,9 +119,12 @@ using quickstep::TaggedMessage;
 using quickstep::Worker;
 using quickstep::WorkerDirectory;
 using quickstep::WorkerMessage;
+using quickstep::kAdmitRequestMessage;
 using quickstep::kPoisonMessage;
+using quickstep::kWorkloadCompletionMessage;
 
 using tmb::client_id;
+using tmb::AnnotatedMessage;
 
 namespace quickstep {
 
@@ -197,7 +204,9 @@ int main(int argc, char* argv[]) {
 
   // The TMB client id for the main thread, used to kill workers at the end.
   const client_id main_thread_client_id = bus.Connect();
+  bus.RegisterClientAsSender(main_thread_client_id, kAdmitRequestMessage);
   bus.RegisterClientAsSender(main_thread_client_id, kPoisonMessage);
+  bus.RegisterClientAsReceiver(main_thread_client_id, kWorkloadCompletionMessage);
 
   // Setup the paths used by StorageManager.
   string fixed_storage_path(quickstep::FLAGS_storage_path);
@@ -283,12 +292,6 @@ int main(int argc, char* argv[]) {
            std::chrono::duration<double>(preload_end - preload_start).count());
   }
 
-  Foreman foreman(&bus,
-                  query_processor->getDefaultDatabase(),
-                  query_processor->getStorageManager(),
-                  -1, /* CPU id to bind foreman. -1 is unbound. */
-                  num_numa_nodes_system);
-
   // Get the NUMA affinities for workers.
   vector<int> cpu_numa_nodes = InputParserUtil::GetNUMANodesForCPUs();
   if (cpu_numa_nodes.empty()) {
@@ -323,13 +326,20 @@ int main(int argc, char* argv[]) {
                                    worker_client_ids,
                                    worker_numa_nodes);
 
-  foreman.setWorkerDirectory(&worker_directory);
+  Foreman foreman(main_thread_client_id,
+                  &worker_directory,
+                  &bus,
+                  query_processor->getDefaultDatabase(),
+                  query_processor->getStorageManager(),
+                  num_numa_nodes_system);
 
   // Start the worker threads.
   for (Worker &worker : workers) {
     worker.start();
   }
 
+  foreman.start();
+
   LineReaderImpl line_reader("quickstep> ",
                              "      ...> ");
   std::unique_ptr<SqlParserWrapper> parser_wrapper(new SqlParserWrapper());
@@ -366,9 +376,11 @@ int main(int argc, char* argv[]) {
             quickstep::cli::executeCommand(
                 *result.parsed_statement,
                 *(query_processor->getDefaultDatabase()),
+                main_thread_client_id,
+                foreman.getBusClientID(),
+                &bus,
                 query_processor->getStorageManager(),
                 query_processor.get(),
-                &foreman,
                 stdout);
           } catch (const quickstep::SqlError &sql_error) {
             fprintf(stderr, "%s",
@@ -389,14 +401,20 @@ int main(int argc, char* argv[]) {
         }
 
         DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-        foreman.setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
-
-        foreman.reconstructQueryContextFromProto(query_handle->getQueryContextProto());
-
+        AdmitRequestMessage request_message(query_handle.get());
+        TaggedMessage admit_tagged_message(
+            &request_message, sizeof(request_message), kAdmitRequestMessage);
+
+        start = std::chrono::steady_clock::now();
+        QueryExecutionUtil::SendTMBMessage(&bus,
+                                           main_thread_client_id,
+                                           foreman.getBusClientID(),
+                                           std::move(admit_tagged_message));
         try {
-          start = std::chrono::steady_clock::now();
-          foreman.start();
-          foreman.join();
+          const AnnotatedMessage annotated_msg =
+              bus.Receive(main_thread_client_id, 0, true);
+          const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+          DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
           end = std::chrono::steady_clock::now();
 
           const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
@@ -440,29 +458,13 @@ int main(int argc, char* argv[]) {
     }
   }
 
-  // Terminate all workers before exiting.
-  // The main thread broadcasts poison message to the workers. Each worker dies
-  // after receiving poison message. The order of workers' death is irrelavant.
-  MessageStyle style;
-  style.Broadcast(true);
-  Address address;
-  address.All(true);
-  std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
-  TaggedMessage poison_tagged_message(poison_message.get(),
-                                      sizeof(*poison_message),
-                                      kPoisonMessage);
-
-  const tmb::MessageBus::SendStatus send_status =
-      bus.Send(main_thread_client_id,
-               address,
-               style,
-               std::move(poison_tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-     "Broadcast message from Foreman to workers failed";
+  // Kill the foreman and workers.
+  QueryExecutionUtil::BroadcastPoisonMessage(main_thread_client_id, &bus);
 
   for (Worker &worker : workers) {
     worker.join();
   }
 
+  foreman.join();
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/cli/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt
index ca37e4a..d177d6c 100644
--- a/cli/tests/CMakeLists.txt
+++ b/cli/tests/CMakeLists.txt
@@ -32,9 +32,11 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest
                       quickstep_cli_PrintToScreen
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
+                      quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_Worker
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp
index 9cd493e..794f7e1 100644
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@ -20,12 +20,15 @@
 #include <cstdio>
 #include <set>
 #include <string>
+#include <utility>
 
 #include "cli/CommandExecutor.hpp"
 #include "cli/DropRelation.hpp"
 #include "cli/PrintToScreen.hpp"
 #include "parser/ParseStatement.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/Foreman.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_optimizer/ExecutionGenerator.hpp"
 #include "query_optimizer/LogicalGenerator.hpp"
@@ -41,6 +44,8 @@
 
 #include "glog/logging.h"
 
+#include "tmb/tagged_message.h"
+
 namespace quickstep {
 
 class CatalogRelation;
@@ -87,9 +92,11 @@ void CommandExecutorTestRunner::runTestCase(
           quickstep::cli::executeCommand(
               *result.parsed_statement,
               *(test_database_loader_.catalog_database()),
+              main_thread_client_id_,
+              foreman_->getBusClientID(),
+              &bus_,
               test_database_loader_.storage_manager(),
               nullptr,
-              nullptr,
               output_stream.file());
         } else  {
           QueryHandle query_handle(optimizer_context.query_id());
@@ -100,14 +107,20 @@ void CommandExecutorTestRunner::runTestCase(
               physical_generator.generatePlan(
                   logical_generator.generatePlan(*result.parsed_statement));
           execution_generator.generatePlan(physical_plan);
-          foreman_->setQueryPlan(
-              query_handle.getQueryPlanMutable()->getQueryPlanDAGMutable());
-
-          foreman_->reconstructQueryContextFromProto(query_handle.getQueryContextProto());
-
-          foreman_->start();
-          foreman_->join();
 
+          AdmitRequestMessage request_message(&query_handle);
+          TaggedMessage admit_tagged_message(
+              &request_message, sizeof(request_message), kAdmitRequestMessage);
+          QueryExecutionUtil::SendTMBMessage(&bus_,
+                                             main_thread_client_id_,
+                                             foreman_->getBusClientID(),
+                                             std::move(admit_tagged_message));
+
+          // Receive workload completion message from Foreman.
+          const AnnotatedMessage annotated_msg =
+              bus_.Receive(main_thread_client_id_, 0, true);
+          const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+          DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
           const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
           if (query_result_relation) {
             PrintToScreen::PrintRelation(*query_result_relation,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/cli/tests/CommandExecutorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.hpp b/cli/tests/CommandExecutorTestRunner.hpp
index 94b1d6a..8fb5b65 100644
--- a/cli/tests/CommandExecutorTestRunner.hpp
+++ b/cli/tests/CommandExecutorTestRunner.hpp
@@ -27,6 +27,7 @@
 #include "parser/SqlParserWrapper.hpp"
 #include "query_execution/Foreman.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_execution/WorkerDirectory.hpp"
 #include "query_execution/WorkerMessage.hpp"
@@ -34,6 +35,9 @@
 #include "utility/Macros.hpp"
 #include "utility/textbased_test/TextBasedTestDriver.hpp"
 
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
 namespace quickstep {
 
 /**
@@ -57,9 +61,11 @@ class CommandExecutorTestRunner : public TextBasedTestRunner {
 
     bus_.Initialize();
 
-    foreman_.reset(new Foreman(&bus_,
-                               test_database_loader_.catalog_database(),
-                               test_database_loader_.storage_manager()));
+    main_thread_client_id_ = bus_.Connect();
+    bus_.RegisterClientAsSender(main_thread_client_id_, kAdmitRequestMessage);
+    bus_.RegisterClientAsSender(main_thread_client_id_, kPoisonMessage);
+    bus_.RegisterClientAsReceiver(main_thread_client_id_, kWorkloadCompletionMessage);
+
     worker_.reset(new Worker(0, &bus_));
 
     std::vector<client_id> worker_client_ids;
@@ -71,27 +77,20 @@ class CommandExecutorTestRunner : public TextBasedTestRunner {
 
     workers_.reset(new WorkerDirectory(1 /* number of workers */,
                                        worker_client_ids, numa_nodes));
-    foreman_->setWorkerDirectory(workers_.get());
+    foreman_.reset(new Foreman(main_thread_client_id_,
+                               workers_.get(),
+                               &bus_,
+                               test_database_loader_.catalog_database(),
+                               test_database_loader_.storage_manager()));
 
+    foreman_->start();
     worker_->start();
   }
 
   ~CommandExecutorTestRunner() {
-    std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
-    TaggedMessage poison_tagged_message(poison_message.get(),
-                                        sizeof(*poison_message),
-                                        quickstep::kPoisonMessage);
-
-    Address worker_address;
-    MessageStyle single_receiver_style;
-
-    worker_address.AddRecipient(worker_->getBusClientID());
-    bus_.Send(foreman_->getBusClientID(),
-              worker_address,
-              single_receiver_style,
-              std::move(poison_tagged_message));
-
+    QueryExecutionUtil::BroadcastPoisonMessage(main_thread_client_id_, &bus_);
     worker_->join();
+    foreman_->join();
   }
 
   void runTestCase(const std::string &input,
@@ -102,6 +101,8 @@ class CommandExecutorTestRunner : public TextBasedTestRunner {
   SqlParserWrapper sql_parser_;
   optimizer::TestDatabaseLoader test_database_loader_;
 
+  tmb::client_id main_thread_client_id_;
+
   MessageBusImpl bus_;
   std::unique_ptr<Foreman> foreman_;
   std::unique_ptr<Worker> worker_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp
new file mode 100644
index 0000000..e33b354
--- /dev/null
+++ b/query_execution/AdmitRequestMessage.hpp
@@ -0,0 +1,73 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
+
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class QueryHandle;
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A message requesting a query or queries to be admitted to the system.
+ **/
+class AdmitRequestMessage {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_handles The handles of the queries requesting to be admitted
+   *        to the system.
+   **/
+  explicit AdmitRequestMessage(const std::vector<QueryHandle*> &query_handles)
+      : query_handles_(query_handles) {}
+
+  /**
+   * @brief Constructor for requesting single query admission.
+   *
+   * @param query_handle The handle of the query requesting to be admitted.
+   **/
+  explicit AdmitRequestMessage(QueryHandle *query_handle) {
+    query_handles_.push_back(query_handle);
+  }
+
+  /**
+   * @brief Get the query handles from this message.
+   **/
+  const std::vector<QueryHandle*>& getQueryHandles() const {
+    return query_handles_;
+  }
+
+ private:
+  std::vector<QueryHandle*> query_handles_;
+
+  DISALLOW_COPY_AND_ASSIGN(AdmitRequestMessage);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 7d9d601..2f738e4 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -23,8 +23,10 @@ QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryExecutionMessages_proto_srcs
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
 endif()
+add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
             ${queryexecution_QueryContext_proto_srcs}
@@ -43,6 +45,8 @@ add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessag
 add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
 
 # Link dependencies:
+target_link_libraries(quickstep_queryexecution_AdmitRequestMessage
+                      quickstep_utility_Macros)
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_BlockLocator
                         glog
@@ -57,29 +61,17 @@ if (ENABLE_DISTRIBUTED)
                         tmb)
 endif()
 target_link_libraries(quickstep_queryexecution_Foreman
+                      gflags_nothreads-static
                       glog
-                      gtest
-                      quickstep_catalog_CatalogDatabase
-                      quickstep_catalog_CatalogRelation
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_catalog_PartitionScheme
+                      quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanLite
-                      quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryExecutionMessages_proto
-                      quickstep_queryexecution_QueryExecutionState
+                      quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil
-                      quickstep_queryexecution_WorkOrdersContainer
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage
-                      quickstep_relationaloperators_RebuildWorkOrder
-                      quickstep_relationaloperators_RelationalOperator
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_storage_InsertDestination
-                      quickstep_storage_StorageBlock
-                      quickstep_storage_StorageBlockInfo
                       quickstep_threading_ThreadUtil
-                      quickstep_utility_DAG
+                      quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Macros
                       tmb)
 target_link_libraries(quickstep_queryexecution_ForemanLite
@@ -87,6 +79,16 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
                       quickstep_threading_Thread
                       quickstep_utility_Macros
                       tmb)
+target_link_libraries(quickstep_queryexecution_PolicyEnforcer
+                      glog
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryManager
+                      quickstep_queryexecution_WorkerMessage
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_utility_Macros
+                      tmb)
 target_link_libraries(quickstep_queryexecution_QueryContext
                       glog
                       quickstep_catalog_CatalogDatabaseLite
@@ -128,7 +130,9 @@ target_link_libraries(quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_threading_ThreadIDBasedMap
                       tmb)
 target_link_libraries(quickstep_queryexecution_QueryExecutionUtil
+                      quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_WorkerMessage
                       quickstep_utility_Macros
                       tmb)
 target_link_libraries(quickstep_queryexecution_QueryManager
@@ -178,8 +182,10 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
 # Module all-in-one library:
 add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp)
 target_link_libraries(quickstep_queryexecution
+                      quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_ForemanLite
+                      quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryexecution_QueryExecutionMessages_proto
@@ -223,36 +229,6 @@ if (ENABLE_DISTRIBUTED)
   add_test(BlockLocator_unittest BlockLocator_unittest)
 endif()
 
-add_executable(Foreman_unittest
-  "${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp")
-target_link_libraries(Foreman_unittest
-                      glog
-                      gtest
-                      gtest_main
-                      quickstep_catalog_CatalogDatabase
-                      quickstep_catalog_CatalogRelation
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_queryexecution_Foreman
-                      quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryContext_proto
-                      quickstep_queryexecution_QueryExecutionState
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_WorkOrdersContainer
-                      quickstep_queryexecution_WorkerDirectory
-                      quickstep_queryexecution_WorkerMessage
-                      quickstep_queryoptimizer_QueryPlan
-                      quickstep_relationaloperators_RelationalOperator
-                      quickstep_relationaloperators_WorkOrder
-                      quickstep_storage_InsertDestination
-                      quickstep_storage_InsertDestination_proto
-                      quickstep_storage_StorageBlock
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageManager
-                      quickstep_utility_DAG
-                      quickstep_utility_Macros
-                      tmb)
-add_test(Foreman_unittest Foreman_unittest)
-
 add_executable(QueryManager_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
 target_link_libraries(QueryManager_unittest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 7705819..3609120 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -22,355 +22,189 @@
 #include <utility>
 #include <vector>
 
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "catalog/PartitionScheme.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/WorkerDirectory.hpp"
 #include "query_execution/WorkerMessage.hpp"
-#include "relational_operators/RebuildWorkOrder.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
 #include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
 #include "utility/Macros.hpp"
 
+#include "gflags/gflags.h"
 #include "glog/logging.h"
 
 #include "tmb/message_bus.h"
 #include "tmb/tagged_message.h"
 
 using std::move;
-using std::pair;
 using std::size_t;
+using std::unique_ptr;
 using std::vector;
 
 namespace quickstep {
 
-void Foreman::initialize() {
+DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
+              "of pending work orders for the worker. This information is used "
+              "by the Foreman to assign work orders to worker threads");
+
+Foreman::Foreman(const tmb::client_id main_thread_client_id,
+                 WorkerDirectory *worker_directory,
+                 tmb::MessageBus *bus,
+                 CatalogDatabaseLite *catalog_database,
+                 StorageManager *storage_manager,
+                 const int cpu_id,
+                 const size_t num_numa_nodes)
+    : ForemanLite(bus, cpu_id),
+      main_thread_client_id_(main_thread_client_id),
+      worker_directory_(DCHECK_NOTNULL(worker_directory)),
+      catalog_database_(DCHECK_NOTNULL(catalog_database)),
+      storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+  const std::vector<QueryExecutionMessageType> sender_message_types{
+      kPoisonMessage,
+      kRebuildWorkOrderMessage,
+      kWorkOrderMessage,
+      kWorkloadCompletionMessage};
+
+  for (const auto message_type : sender_message_types) {
+    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+  }
+
+  const std::vector<QueryExecutionMessageType> receiver_message_types{
+      kAdmitRequestMessage,
+      kCatalogRelationNewBlockMessage,
+      kDataPipelineMessage,
+      kPoisonMessage,
+      kRebuildWorkOrderCompleteMessage,
+      kWorkOrderFeedbackMessage,
+      kWorkOrdersAvailableMessage,
+      kWorkOrderCompleteMessage};
+
+  for (const auto message_type : receiver_message_types) {
+    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+  }
+
+  policy_enforcer_.reset(new PolicyEnforcer(
+      foreman_client_id_,
+      num_numa_nodes,
+      catalog_database_,
+      storage_manager_,
+      bus_));
+}
+
+void Foreman::run() {
   if (cpu_id_ >= 0) {
     // We can pin the foreman thread to a CPU if specified.
     ThreadUtil::BindToCPU(cpu_id_);
   }
-  initializeState();
-
-  DEBUG_ASSERT(query_dag_ != nullptr);
-  const dag_node_index dag_size = query_dag_->size();
-
-  // Collect all the workorders from all the relational operators in the DAG.
-  for (dag_node_index index = 0; index < dag_size; ++index) {
-    if (checkAllBlockingDependenciesMet(index)) {
-      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
-      processOperator(index, false);
-    }
-  }
-
-  // Dispatch the WorkOrders generated so far.
-  dispatchWorkerMessages(0, 0);
-}
-
-void Foreman::processWorkOrderCompleteMessage(const dag_node_index op_index,
-                                              const size_t worker_thread_index) {
-  query_exec_state_->decrementNumQueuedWorkOrders(op_index);
-
-  // As the given worker finished executing a WorkOrder, decrement its number
-  // of queued WorkOrders.
-  workers_->decrementNumQueuedWorkOrders(worker_thread_index);
-
-  // Check if new work orders are available and fetch them if so.
-  fetchNormalWorkOrders(op_index);
-
-  if (checkRebuildRequired(op_index)) {
-    if (checkNormalExecutionOver(op_index)) {
-      if (!checkRebuildInitiated(op_index)) {
-        if (initiateRebuild(op_index)) {
-          // Rebuild initiated and completed right away.
-          markOperatorFinished(op_index);
-        } else {
-          // Rebuild under progress.
-        }
-      } else if (checkRebuildOver(op_index)) {
-        // Rebuild was under progress and now it is over.
-        markOperatorFinished(op_index);
-      }
-    } else {
-      // Normal execution under progress for this operator.
-    }
-  } else if (checkOperatorExecutionOver(op_index)) {
-    // Rebuild not required for this operator and its normal execution is
-    // complete.
-    markOperatorFinished(op_index);
-  }
-
-  for (const pair<dag_node_index, bool> &dependent_link :
-       query_dag_->getDependents(op_index)) {
-    const dag_node_index dependent_op_index = dependent_link.first;
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      // Process the dependent operator (of the operator whose WorkOrder
-      // was just executed) for which all the dependencies have been met.
-      processOperator(dependent_op_index, true);
-    }
-  }
-
-  // Dispatch the WorkerMessages to the workers. We prefer to start the search
-  // for the schedulable WorkOrders beginning from 'op_index'. The first
-  // candidate worker to receive the next WorkOrder is the one that sent the
-  // response message to Foreman.
-  dispatchWorkerMessages(worker_thread_index, op_index);
-}
-
-void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
-                                                     const size_t worker_thread_index) {
-  query_exec_state_->decrementNumRebuildWorkOrders(op_index);
-  workers_->decrementNumQueuedWorkOrders(worker_thread_index);
-
-  if (checkRebuildOver(op_index)) {
-    markOperatorFinished(op_index);
-
-    for (const pair<dag_node_index, bool> &dependent_link :
-         query_dag_->getDependents(op_index)) {
-      const dag_node_index dependent_op_index = dependent_link.first;
-      if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-        processOperator(dependent_op_index, true);
-      }
-    }
-  }
-
-  // Dispatch the WorkerMessages to the workers. We prefer to start the search
-  // for the schedulable WorkOrders beginning from 'op_index'. The first
-  // candidate worker to receive the next WorkOrder is the one that sent the
-  // response message to Foreman.
-  dispatchWorkerMessages(worker_thread_index, op_index);
-}
-
-void Foreman::processDataPipelineMessage(const dag_node_index op_index,
-                                         const block_id block,
-                                         const relation_id rel_id) {
-  for (const dag_node_index consumer_index :
-       output_consumers_[op_index]) {
-    // Feed the streamed block to the consumer. Note that 'output_consumers_'
-    // only contain those dependents of operator with index = op_index which are
-    // eligible to receive streamed input.
-    query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
-    // Because of the streamed input just fed, check if there are any new
-    // WorkOrders available and if so, fetch them.
-    fetchNormalWorkOrders(consumer_index);
-  }
-
-  // Dispatch the WorkerMessages to the workers. We prefer to start the search
-  // for the schedulable WorkOrders beginning from 'op_index'. The first
-  // candidate worker to receive the next WorkOrder is the one that sent the
-  // response message to Foreman.
-  // TODO(zuyu): Improve the data locality for the next WorkOrder.
-  dispatchWorkerMessages(0, op_index);
-}
-
-void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
-  RelationalOperator *op =
-      query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
-  op->receiveFeedbackMessage(msg);
-}
-
-void Foreman::run() {
-  // Initialize before for Foreman eventloop.
-  initialize();
 
   // Event loop
-  while (!query_exec_state_->hasQueryExecutionFinished()) {
+  for (;;) {
     // Receive() causes this thread to sleep until next message is received.
-    AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, true);
+    const AnnotatedMessage annotated_msg =
+        bus_->Receive(foreman_client_id_, 0, true);
     const TaggedMessage &tagged_message = annotated_msg.tagged_message;
-    switch (tagged_message.message_type()) {
-      case kWorkOrderCompleteMessage: {
-        serialization::WorkOrderCompletionMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
-        break;
-      }
-      case kRebuildWorkOrderCompleteMessage: {
-        serialization::WorkOrderCompletionMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processRebuildWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
+    const tmb::message_type_id message_type = tagged_message.message_type();
+    switch (message_type) {
+      case kCatalogRelationNewBlockMessage:  // Fall through
+      case kDataPipelineMessage:
+      case kRebuildWorkOrderCompleteMessage:
+      case kWorkOrderCompleteMessage:
+      case kWorkOrderFeedbackMessage:
+      case kWorkOrdersAvailableMessage: {
+        policy_enforcer_->processMessage(tagged_message);
         break;
       }
-      case kCatalogRelationNewBlockMessage: {
-        serialization::CatalogRelationNewBlockMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const block_id block = proto.block_id();
-
-        CatalogRelation *relation =
-            static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
-        relation->addBlock(block);
-
-        if (proto.has_partition_id()) {
-          relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(), block);
+      case kAdmitRequestMessage: {
+        const AdmitRequestMessage *msg =
+            static_cast<const AdmitRequestMessage *>(tagged_message.message());
+        const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
+
+        DCHECK(!query_handles.empty());
+        bool all_queries_admitted = true;
+        if (query_handles.size() == 1u) {
+          all_queries_admitted =
+              policy_enforcer_->admitQuery(query_handles.front());
+        } else {
+          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+        }
+        if (!all_queries_admitted) {
+          LOG(WARNING) << "The scheduler could not admit all the queries";
+          // TODO(harshad) - Inform the main thread about the failure.
         }
         break;
       }
-      case kDataPipelineMessage: {
-        // Possible message senders include InsertDestinations and some
-        // operators which modify existing blocks.
-        serialization::DataPipelineMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processDataPipelineMessage(proto.operator_index(), proto.block_id(), proto.relation_id());
-        break;
-      }
-      case kWorkOrdersAvailableMessage: {
-        serialization::WorkOrdersAvailableMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const dag_node_index op_index = proto.operator_index();
-
-        // Check if new work orders are available.
-        fetchNormalWorkOrders(op_index);
-
-        // Dispatch the WorkerMessages to the workers. We prefer to start the search
-        // for the schedulable WorkOrders beginning from 'op_index'. The first
-        // candidate worker to receive the next WorkOrder is the one that sent the
-        // response message to Foreman.
-        // TODO(zuyu): Improve the data locality for the next WorkOrder.
-        dispatchWorkerMessages(0, op_index);
-        break;
-      }
-      case kWorkOrderFeedbackMessage: {
-        WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()),
-                                       tagged_message.message_bytes());
-        processFeedbackMessage(msg);
-        break;
+      case kPoisonMessage: {
+        if (policy_enforcer_->hasQueries()) {
+          LOG(WARNING) << "Foreman thread exiting while some queries are "
+                          "under execution or waiting to be admitted";
+        }
+        return;
       }
       default:
         LOG(FATAL) << "Unknown message type to Foreman";
     }
-  }
-
-  // Clean up before exiting.
-  cleanUp();
-}
 
-void Foreman::dispatchWorkerMessages(
-    const size_t start_worker_index,
-    const dag_node_index start_operator_index) {
-  // Loop over all workers. Stopping criteria:
-  // 1. Every worker has been assigned exactly max_msgs_per_worker_ workorders.
-  // OR 2. No schedulable workorders at this time.
-  size_t done_workers_count = 0;
-  for (size_t curr_worker = start_worker_index;
-       done_workers_count < workers_->getNumWorkers();
-       curr_worker = (curr_worker + 1) % workers_->getNumWorkers()) {
-    if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) {
-      std::unique_ptr<WorkerMessage> msg;
-      msg.reset(getNextWorkerMessage(
-          start_operator_index, workers_->getNUMANode(curr_worker)));
-      if (msg.get() != nullptr) {
-        sendWorkerMessage(curr_worker, *msg);
-        workers_->incrementNumQueuedWorkOrders(curr_worker);
-      } else {
-        // No schedulable workorder at this point.
-        ++done_workers_count;
-      }
-    } else {
-      // curr_worker already has been assigned max_msgs_per_worker workorders.
-      ++done_workers_count;
+    if (canCollectNewMessages(message_type)) {
+      vector<unique_ptr<WorkerMessage>> new_messages;
+      policy_enforcer_->getWorkerMessages(&new_messages);
+      dispatchWorkerMessages(new_messages);
+    }
+
+    // We check again, as some queries may produce zero work orders and finish
+    // their execution.
+    if (!policy_enforcer_->hasQueries()) {
+      // Signal the main thread that there are no queries to be executed.
+      // Currently the message doesn't have any real content.
+      const int dummy_payload = 0;
+      TaggedMessage completion_tagged_message(
+          &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
+      const tmb::MessageBus::SendStatus send_status =
+          QueryExecutionUtil::SendTMBMessage(
+              bus_,
+              foreman_client_id_,
+              main_thread_client_id_,
+              move(completion_tagged_message));
+      CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+          << "Message could not be sent from Foreman with TMB client ID "
+          << foreman_client_id_ << " to main thread with TMB client ID"
+          << main_thread_client_id_;
     }
   }
 }
 
-void Foreman::initializeState() {
-  const dag_node_index dag_size = query_dag_->size();
-
-  output_consumers_.resize(dag_size);
-  blocking_dependencies_.resize(dag_size);
-
-  query_exec_state_.reset(new QueryExecutionState(dag_size));
-  workorders_container_.reset(new WorkOrdersContainer(dag_size, num_numa_nodes_));
-
-  for (dag_node_index node_index = 0; node_index < dag_size; ++node_index) {
-    const QueryContext::insert_destination_id insert_destination_index =
-        query_dag_->getNodePayload(node_index).getInsertDestinationID();
-    if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
-      // Rebuild is necessary whenever InsertDestination is present.
-      query_exec_state_->setRebuildRequired(node_index);
-      query_exec_state_->setRebuildStatus(node_index, 0, false);
-    }
-
-    for (const pair<dag_node_index, bool> &dependent_link :
-         query_dag_->getDependents(node_index)) {
-      const dag_node_index dependent_op_index = dependent_link.first;
-      if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
-        // The link is not a pipeline-breaker. Streaming of blocks is possible
-        // between these two operators.
-        output_consumers_[node_index].push_back(dependent_op_index);
-      } else {
-        // The link is a pipeline-breaker. Streaming of blocks is not possible
-        // between these two operators.
-        blocking_dependencies_[dependent_op_index].push_back(node_index);
-      }
-    }
+bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) {
+  if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+                                    kCatalogRelationNewBlockMessage,
+                                    kWorkOrderFeedbackMessage)) {
+    return false;
+  } else if (worker_directory_->getLeastLoadedWorker().second <=
+             FLAGS_min_load_per_worker) {
+    // If the least loaded worker has only one pending work order, we should
+    // collect new messages and dispatch them.
+    return true;
+  } else {
+    return false;
   }
 }
 
-// TODO(harshad) : The default policy may execute remote WorkOrders for an
-// operator with a lower index even when there are local WorkOrders available for
-// an operator with higher index. We should examine if avoiding this behavior
-// has any benefits with respect to execution time and/or memory pressure.
-WorkerMessage* Foreman::getNextWorkerMessage(
-    const dag_node_index start_operator_index, const int numa_node) {
-  // Default policy: Operator with lowest index first.
-  WorkOrder *work_order = nullptr;
-  size_t num_operators_checked = 0;
-  for (dag_node_index index = start_operator_index;
-       num_operators_checked < query_dag_->size();
-       index = (index + 1) % query_dag_->size(), ++num_operators_checked) {
-    if (query_exec_state_->hasExecutionFinished(index)) {
-      continue;
-    }
-    if (numa_node != -1) {
-      // First try to get a normal WorkOrder from the specified NUMA node.
-      work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
-      if (work_order != nullptr) {
-        // A WorkOrder found on the given NUMA node.
-        query_exec_state_->incrementNumQueuedWorkOrders(index);
-        return WorkerMessage::WorkOrderMessage(work_order, index);
-      } else {
-        // Normal workorder not found on this node. Look for a rebuild workorder
-        // on this NUMA node.
-        work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
-        if (work_order != nullptr) {
-          return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
-        }
-      }
-    }
-    // Either no workorder found on the given NUMA node, or numa_node is -1.
-    // Try to get a normal WorkOrder from other NUMA nodes.
-    work_order = workorders_container_->getNormalWorkOrder(index);
-    if (work_order != nullptr) {
-      query_exec_state_->incrementNumQueuedWorkOrders(index);
-      return WorkerMessage::WorkOrderMessage(work_order, index);
+void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
+  for (auto const &message : messages) {
+    DCHECK(message != nullptr);
+    int recipient_worker_thread_index = message->getRecipientHint();
+    if (recipient_worker_thread_index != -1) {
+      sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
+                        *message);
     } else {
-      // Normal WorkOrder not found, look for a RebuildWorkOrder.
-      work_order = workorders_container_->getRebuildWorkOrder(index);
-      if (work_order != nullptr) {
-        return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
-      }
+      sendWorkerMessage(worker_directory_->getLeastLoadedWorker().first,
+                        *message);
     }
   }
-  // No WorkOrders available right now.
-  return nullptr;
 }
 
-void Foreman::sendWorkerMessage(const std::size_t worker_thread_index,
+void Foreman::sendWorkerMessage(const size_t worker_thread_index,
                                 const WorkerMessage &message) {
-  message_type_id type;
+  tmb::message_type_id type;
   if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) {
     type = kRebuildWorkOrderMessage;
   } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) {
@@ -383,152 +217,12 @@ void Foreman::sendWorkerMessage(const std::size_t worker_thread_index,
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
-                                         workers_->getClientID(worker_thread_index),
+                                         worker_directory_->getClientID(worker_thread_index),
                                          move(worker_tagged_message));
   CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
       "Message could not be sent from Foreman with TMB client ID "
       << foreman_client_id_ << " to Foreman with TMB client ID "
-      << workers_->getClientID(worker_thread_index);
-}
-
-bool Foreman::fetchNormalWorkOrders(const dag_node_index index) {
-  bool generated_new_workorders = false;
-  if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
-    // Do not fetch any work units until all blocking dependencies are met.
-    // The releational operator is not aware of blocking dependencies for
-    // uncorrelated scalar queries.
-    if (!checkAllBlockingDependenciesMet(index)) {
-      return false;
-    }
-    const size_t num_pending_workorders_before =
-        workorders_container_->getNumNormalWorkOrders(index);
-    const bool done_generation =
-        query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
-                                                                   query_context_.get(),
-                                                                   storage_manager_,
-                                                                   foreman_client_id_,
-                                                                   bus_);
-    if (done_generation) {
-      query_exec_state_->setDoneGenerationWorkOrders(index);
-    }
-
-    // TODO(shoban): It would be a good check to see if operator is making
-    // useful progress, i.e., the operator either generates work orders to
-    // execute or still has pending work orders executing. However, this will not
-    // work if Foreman polls operators without feeding data. This check can be
-    // enabled, if Foreman is refactored to call getAllWorkOrders() only when
-    // pending work orders are completed or new input blocks feed.
-
-    generated_new_workorders =
-        (num_pending_workorders_before <
-         workorders_container_->getNumNormalWorkOrders(index));
-  }
-  return generated_new_workorders;
-}
-
-void Foreman::processOperator(const dag_node_index index,
-                              const bool recursively_check_dependents) {
-  if (fetchNormalWorkOrders(index)) {
-    // Fetched work orders. Return to wait for the generated work orders to
-    // execute, and skip the execution-finished checks.
-    return;
-  }
-
-  if (checkNormalExecutionOver(index)) {
-    if (checkRebuildRequired(index)) {
-      if (!checkRebuildInitiated(index)) {
-        // Rebuild hasn't started, initiate it.
-        if (initiateRebuild(index)) {
-          // Rebuild initiated and completed right away.
-          markOperatorFinished(index);
-        } else {
-          // Rebuild WorkOrders have been generated.
-          return;
-        }
-      } else if (checkRebuildOver(index)) {
-        // Rebuild had been initiated and it is over.
-        markOperatorFinished(index);
-      }
-    } else {
-      // Rebuild is not required and normal execution over, mark finished.
-      markOperatorFinished(index);
-    }
-    // If we reach here, that means the operator has been marked as finished.
-    if (recursively_check_dependents) {
-      for (const pair<dag_node_index, bool> &dependent_link :
-           query_dag_->getDependents(index)) {
-        const dag_node_index dependent_op_index = dependent_link.first;
-        if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-          processOperator(dependent_op_index, true);
-        }
-      }
-    }
-  }
-}
-
-void Foreman::markOperatorFinished(const dag_node_index index) {
-  query_exec_state_->setExecutionFinished(index);
-
-  RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
-  op->updateCatalogOnCompletion();
-
-  const relation_id output_rel = op->getOutputRelationID();
-  for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
-    const dag_node_index dependent_op_index = dependent_link.first;
-    RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
-    // Signal dependent operator that current operator is done feeding input blocks.
-    if (output_rel >= 0) {
-      dependent_op->doneFeedingInputBlocks(output_rel);
-    }
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      dependent_op->informAllBlockingDependenciesMet();
-    }
-  }
-}
-
-bool Foreman::initiateRebuild(const dag_node_index index) {
-  DEBUG_ASSERT(!workorders_container_->hasRebuildWorkOrder(index));
-  DEBUG_ASSERT(checkRebuildRequired(index));
-  DEBUG_ASSERT(!checkRebuildInitiated(index));
-
-  getRebuildWorkOrders(index, workorders_container_.get());
-
-  query_exec_state_->setRebuildStatus(
-      index, workorders_container_->getNumRebuildWorkOrders(index), true);
-
-  return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
-}
-
-void Foreman::getRebuildWorkOrders(const dag_node_index index, WorkOrdersContainer *container) {
-  const RelationalOperator &op = query_dag_->getNodePayload(index);
-  const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
-
-  if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
-    return;
-  }
-
-  vector<MutableBlockReference> partially_filled_block_refs;
-
-  DCHECK(query_context_ != nullptr);
-  InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
-  DCHECK(insert_destination != nullptr);
-
-  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
-
-  for (vector<MutableBlockReference>::size_type i = 0;
-       i < partially_filled_block_refs.size();
-       ++i) {
-    // Note: The query ID used below is dummy for now, it will be replaced with
-    // the true query ID when QueryManager gets used in Foreman.
-    container->addRebuildWorkOrder(
-        new RebuildWorkOrder(0,
-                             move(partially_filled_block_refs[i]),
-                             index,
-                             op.getOutputRelationID(),
-                             foreman_client_id_,
-                             bus_),
-        index);
-  }
+      << worker_directory_->getClientID(worker_thread_index);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index 2d6e0d3..5c4893d 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -22,46 +22,40 @@
 #include <memory>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/ForemanLite.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "utility/DAG.hpp"
+#include "query_execution/PolicyEnforcer.hpp"
 #include "utility/Macros.hpp"
 
-#include "glog/logging.h"
-#include "gtest/gtest_prod.h"
+#include "gflags/gflags.h"
 
+#include "tmb/id_typedefs.h"
 #include "tmb/message_bus.h"
 
 namespace quickstep {
 
+DECLARE_uint64(min_load_per_worker);
+
 class CatalogDatabaseLite;
 class StorageManager;
 class WorkerDirectory;
-
-namespace serialization { class QueryContext; }
+class WorkerMessage;
 
 /** \addtogroup QueryExecution
  *  @{
  */
 
 /**
- * @brief The Foreman scans the query DAG, requests each operator to produce
- *        workorders. It also pipelines the intermediate output it receives to
- *        the relational operators which need it.
+ * @brief The Foreman receives queries from the main thread, messages from the
+ *        policy enforcer and dispatches the work to worker threads. It also
+ *        receives work completion messages from workers.
  **/
 class Foreman final : public ForemanLite {
  public:
   /**
    * @brief Constructor.
    *
+   * @param main_thread_client_id The TMB client ID of the main thread.
+   * @param worker_directory The worker directory.
    * @param bus A pointer to the TMB.
    * @param catalog_database The catalog database where this query is executed.
    * @param storage_manager The StorageManager to use.
@@ -71,233 +65,31 @@ class Foreman final : public ForemanLite {
    * @note If cpu_id is not specified, Foreman thread can be possibly moved
    *       around on different CPUs by the OS.
   **/
-  Foreman(tmb::MessageBus *bus,
+  Foreman(const tmb::client_id main_thread_client_id,
+          WorkerDirectory *worker_directory,
+          tmb::MessageBus *bus,
           CatalogDatabaseLite *catalog_database,
           StorageManager *storage_manager,
           const int cpu_id = -1,
-          const int num_numa_nodes = 1)
-      : ForemanLite(bus, cpu_id),
-        catalog_database_(DCHECK_NOTNULL(catalog_database)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)),
-        max_msgs_per_worker_(1),
-        num_numa_nodes_(num_numa_nodes) {
-    bus_->RegisterClientAsSender(foreman_client_id_, kWorkOrderMessage);
-    bus_->RegisterClientAsSender(foreman_client_id_, kRebuildWorkOrderMessage);
-    // NOTE : Foreman thread sends poison messages in the optimizer's
-    // ExecutionGeneratorTest.
-    bus_->RegisterClientAsSender(foreman_client_id_, kPoisonMessage);
-
-    bus_->RegisterClientAsReceiver(foreman_client_id_,
-                                   kWorkOrderCompleteMessage);
-    bus_->RegisterClientAsReceiver(foreman_client_id_,
-                                   kRebuildWorkOrderCompleteMessage);
-    bus_->RegisterClientAsReceiver(foreman_client_id_, kCatalogRelationNewBlockMessage);
-    bus_->RegisterClientAsReceiver(foreman_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsReceiver(foreman_client_id_,
-                                   kWorkOrdersAvailableMessage);
-    bus_->RegisterClientAsReceiver(foreman_client_id_,
-                                   kWorkOrderFeedbackMessage);
-  }
+          const std::size_t num_numa_nodes = 1);
 
   ~Foreman() override {}
 
-  /**
-   * @brief Set the Query plan DAG for the query to be executed.
-   *
-   * @param query_plan_dag A pointer to the query plan DAG.
-   **/
-  inline void setQueryPlan(DAG<RelationalOperator, bool> *query_plan_dag) {
-    query_dag_ = query_plan_dag;
-  }
-
-  /**
-   * @brief Reconstruct the QueryContext for the query to be executed.
-   *
-   * @param proto The serialized QueryContext.
-   **/
-  inline void reconstructQueryContextFromProto(const serialization::QueryContext &proto) {
-    query_context_.reset(
-        new QueryContext(proto, *catalog_database_, storage_manager_, foreman_client_id_, bus_));
-  }
-
-  /**
-   * @brief Set the WorkerDirectory pointer.
-   *
-   * @param workers A pointer to the WorkerDirectory.
-   **/
-  void setWorkerDirectory(WorkerDirectory *workers) {
-    workers_ = workers;
-  }
-
-  /**
-   * @brief Set the maximum number of messages that should be allocated to each
-   *        worker during a single round of WorkOrder dispatch.
-   *
-   * @param max_msgs_per_worker Maximum number of messages.
-   **/
-  void setMaxMessagesPerWorker(const std::size_t max_msgs_per_worker) {
-    max_msgs_per_worker_ = max_msgs_per_worker;
-  }
-
  protected:
   /**
-   * @brief The foreman receives a DAG of relational operators, asks relational
-   *        operators to produce the workorders and based on the response it gets
-   *        pipelines the intermediate output to dependent relational operators.
-   *
-   * @note  The workers who get the messages from the Foreman execute and
-   *        subsequently delete the WorkOrder contained in the message.
+   * @brief Run the event-based loop in the Foreman thread.
    **/
   void run() override;
 
  private:
-  typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
-
-  /**
-   * @brief Check if all the dependencies of the node at specified index have
-   *        finished their execution.
-   *
-   * @note This function's true return value is a pre-requisite for calling
-   *       getRebuildWorkOrders()
-   *
-   * @param node_index The index of the specified node in the query DAG.
-   *
-   * @return True if all the dependencies have finished their execution. False
-   *         otherwise.
-   **/
-  inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
-    for (const dag_node_index dependency_index : query_dag_->getDependencies(node_index)) {
-      // If at least one of the dependencies is not met, return false.
-      if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * @brief Check if all the blocking dependencies of the node at specified
-   *        index have finished their execution.
-   *
-   * @note A blocking dependency is the one which is pipeline breaker. Output of
-   *       a dependency can't be streamed to its dependent if the link between
-   *       them is pipeline breaker.
-   *
-   * @param node_index The index of the specified node in the query DAG.
-   *
-   * @return True if all the blocking dependencies have finished their
-   *         execution. False otherwise.
-   **/
-  inline bool checkAllBlockingDependenciesMet(const dag_node_index node_index) const {
-    for (const dag_node_index blocking_dependency_index : blocking_dependencies_[node_index]) {
-      if (!query_exec_state_->hasExecutionFinished(blocking_dependency_index)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   /**
    * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the
    *        worker threads.
    *
-   * @param start_worker_index The dispatch of WorkOrders preferably begins with
-   *        the worker at this index.
-   * @param start_operator_index The search for a schedulable WorkOrder
-   *        begins with the WorkOrders generated by this operator.
-   **/
-  void dispatchWorkerMessages(const std::size_t start_worker_index,
-                              const dag_node_index start_operator_index);
-
-  /**
-   * @brief Initialize all the local vectors and maps. If the operator has an
-   *        InsertDestination, pass the bus address and Foreman's TMB client ID
-   *        to it.
-   **/
-  void initializeState();
-
-  /**
-   * @brief Initialize the Foreman before starting the event loop. This binds
-   * the Foreman thread to configured CPU, and does initial processing of
-   * operator before waiting for events from Workers.
-   **/
-  void initialize();
-
-  /**
-   * @brief Process the received WorkOrder complete message.
-   *
-   * @param node_index The index of the specified operator node in the query DAG
-   *        for the completed WorkOrder.
-   * @param worker_thread_index The logical index of the worker thread in
-   *        WorkerDirectory for the completed WorkOrder.
-   **/
-  void processWorkOrderCompleteMessage(const dag_node_index op_index,
-                                       const std::size_t worker_thread_index);
-
-  /**
-   * @brief Process the received RebuildWorkOrder complete message.
-   *
-   * @param node_index The index of the specified operator node in the query DAG
-   *        for the completed RebuildWorkOrder.
-   * @param worker_thread_index The logical index of the worker thread in
-   *        WorkerDirectory for the completed RebuildWorkOrder.
-   **/
-  void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
-                                              const std::size_t worker_thread_index);
-
-  /**
-   * @brief Process the received data pipeline message.
-   *
-   * @param node_index The index of the specified operator node in the query DAG
-   *        for the pipelining block.
-   * @param block The block id.
-   * @param rel_id The ID of the relation that produced 'block'.
-   **/
-  void processDataPipelineMessage(const dag_node_index op_index,
-                                  const block_id block,
-                                  const relation_id rel_id);
-
-  /**
-   * @brief Process the received work order feedback message and notify relational
-   *        operator.
-   *
-   * @param message Feedback message from work order.
-   **/
-  void processFeedbackMessage(const WorkOrder::FeedbackMessage &message);
-
-  /**
-   * @brief Clear some of the vectors used for a single run of a query.
-   **/
-  void cleanUp() {
-    output_consumers_.clear();
-    blocking_dependencies_.clear();
-  }
-
-  /**
-   * @brief Process a current relational operator: Get its workorders and store
-   *        them in the WorkOrdersContainer for this query. If the operator can
-   *        be marked as done, do so.
-   *
-   * @param index The index of the relational operator to be processed in the
-   *        query plan DAG.
-   * @param recursively_check_dependents If an operator is done, should we
-   *        call processOperator on its dependents recursively.
+   * @param messages The messages to be dispatched.
    **/
-  void processOperator(const dag_node_index index, const bool recursively_check_dependents);
-
- /**
-   * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
-   *
-   * @param start_operator_index Begin the search for the schedulable WorkOrder
-   *        with the operator at this index.
-   * @param numa_node The next WorkOrder should preferably have its input(s)
-   *        from this numa_node. This is a hint and not a binding requirement.
-   *
-   * @return A pointer to the WorkerMessage. If there's no WorkOrder to be
-   *         executed, return NULL.
-   **/
-  WorkerMessage* getNextWorkerMessage(
-      const dag_node_index start_operator_index, const int numa_node = -1);
+  void dispatchWorkerMessages(
+      const std::vector<std::unique_ptr<WorkerMessage>> &messages);
 
   /**
    * @brief Send the given message to the specified worker.
@@ -306,156 +98,24 @@ class Foreman final : public ForemanLite {
    *        in WorkerDirectory.
    * @param message The WorkerMessage to be sent.
    **/
-  void sendWorkerMessage(const std::size_t worker_thread_index, const WorkerMessage &message);
+  void sendWorkerMessage(const std::size_t worker_thread_index,
+                         const WorkerMessage &message);
 
   /**
-   * @brief Fetch all work orders currently available in relational operator and
-   *        store them internally.
-   *
-   * @param index The index of the relational operator to be processed in the
-   *        query plan DAG.
+   * @brief Check if we can collect new messages from the PolicyEnforcer.
    *
-   * @return Whether any work order was generated by op.
+   * @param message_type The type of the last received message.
    **/
-  bool fetchNormalWorkOrders(const dag_node_index index);
+  bool canCollectNewMessages(const tmb::message_type_id message_type);
 
-  /**
-   * @brief This function does the following things:
-   *        1. Mark the given relational operator as "done".
-   *        2. For all the dependents of this operator, check if all of their
-   *        blocking dependencies are met. If so inform them that the blocking
-   *        dependencies are met.
-   *        3. Check if the given operator is done producing output. If it's
-   *        done, inform the dependents that they won't receive input anymore
-   *        from the given operator.
-   *
-   * @param index The index of the given relational operator in the DAG.
-   **/
-  void markOperatorFinished(const dag_node_index index);
+  const tmb::client_id main_thread_client_id_;
 
-  /**
-   * @brief Check if the execution of the given operator is over.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the execution of the given operator is over, false
-   *         otherwise.
-   **/
-  inline bool checkOperatorExecutionOver(const dag_node_index index) const {
-    if (checkRebuildRequired(index)) {
-      return (checkNormalExecutionOver(index) && checkRebuildOver(index));
-    } else {
-      return checkNormalExecutionOver(index);
-    }
-  }
-
-  /**
-   * @brief Check if the given operator's normal execution is over.
-   *
-   * @note The conditions for a given operator's normal execution to get over:
-   *       1. All of its  normal (i.e. non rebuild) WorkOrders have finished
-   *       execution.
-   *       2. The operator is done generating work orders.
-   *       3. All of the dependencies of the given operator have been met.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the normal execution of the given operator is over, false
-   *         otherwise.
-   **/
-  inline bool checkNormalExecutionOver(const dag_node_index index) const {
-    return (checkAllDependenciesMet(index) &&
-            !workorders_container_->hasNormalWorkOrder(index) &&
-            query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
-            query_exec_state_->hasDoneGenerationWorkOrders(index));
-  }
-
-  /**
-   * @brief Check if the rebuild operation is required for a given operator.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild operation is required, false otherwise.
-   **/
-  inline bool checkRebuildRequired(const dag_node_index index) const {
-    return query_exec_state_->isRebuildRequired(index);
-  }
-
-  /**
-   * @brief Check if the rebuild operation for a given operator is over.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild operation is over, false otherwise.
-   **/
-  inline bool checkRebuildOver(const dag_node_index index) const {
-    return query_exec_state_->hasRebuildInitiated(index) &&
-           !workorders_container_->hasRebuildWorkOrder(index) &&
-           (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
-  }
-
-  /**
-   * @brief Check if the rebuild operation for a given operator has been
-   *        initiated.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild operation has been initiated, false otherwise.
-   **/
-  inline bool checkRebuildInitiated(const dag_node_index index) const {
-    return query_exec_state_->hasRebuildInitiated(index);
-  }
-
-  /**
-   * @brief Initiate the rebuild process for partially filled blocks generated
-   *        during the execution of the given operator.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild is over immediately, i.e. the operator didn't
-   *         generate any rebuild WorkOrders, false otherwise.
-   **/
-  bool initiateRebuild(const dag_node_index index);
-
-  /**
-   * @brief Get the rebuild WorkOrders for an operator.
-   *
-   * @note This function should be called only once, when all the normal
-   *       WorkOrders generated by an operator finish their execution.
-   *
-   * @param index The index of the operator in the query plan DAG.
-   * @param container A pointer to a WorkOrdersContainer to be used to store the
-   *        generated WorkOrders.
-   **/
-  void getRebuildWorkOrders(const dag_node_index index, WorkOrdersContainer *container);
+  WorkerDirectory *worker_directory_;
 
   CatalogDatabaseLite *catalog_database_;
   StorageManager *storage_manager_;
 
-  DAG<RelationalOperator, bool> *query_dag_;
-
-  std::unique_ptr<QueryContext> query_context_;
-
-  // During a single round of WorkOrder dispatch, a Worker should be allocated
-  // at most these many WorkOrders.
-  std::size_t max_msgs_per_worker_;
-
-  // For all nodes, store their receiving dependents.
-  std::vector<std::vector<dag_node_index>> output_consumers_;
-
-  // For all nodes, store their pipeline breaking dependencies (if any).
-  std::vector<std::vector<dag_node_index>> blocking_dependencies_;
-
-  std::unique_ptr<QueryExecutionState> query_exec_state_;
-
-  std::unique_ptr<WorkOrdersContainer> workorders_container_;
-
-  const int num_numa_nodes_;
-
-  WorkerDirectory *workers_;
-
-  friend class ForemanTest;
-  FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);
+  std::unique_ptr<PolicyEnforcer> policy_enforcer_;
 
   DISALLOW_COPY_AND_ASSIGN(Foreman);
 };



[2/4] incubator-quickstep git commit: Created QueryManager class and tests.

Posted by hb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index fc10671..46fb7ae 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -189,6 +189,7 @@ class RunTest : public ::testing::Test {
                                        nullptr,
                                        storage_manager_.get(),
                                        kOpIndex,
+                                       0,  // dummy query ID.
                                        foreman_client_id_,
                                        &bus_));
   }
@@ -433,6 +434,7 @@ class RunMergerTest : public ::testing::Test {
                                        nullptr,
                                        storage_manager_.get(),
                                        kOpIndex,
+                                       0,  // dummy query ID.
                                        foreman_client_id_,
                                        &bus_));
   }
@@ -1269,9 +1271,12 @@ class SortMergeRunOperatorTest : public ::testing::Test {
     ASSERT_EQ(null_col3_, result_table_->getAttributeByName("null-col-3")->getID());
     ASSERT_EQ(tid_col_, result_table_->getAttributeByName("tid")->getID());
 
+    query_context_proto_.set_query_id(0);  // dummy query ID.
+
     // Setup the InsertDestination proto in the query context proto.
     insert_destination_index_ = query_context_proto_.insert_destinations_size();
     serialization::InsertDestination *insert_destination_proto = query_context_proto_.add_insert_destinations();
+    insert_destination_proto->set_query_id(query_context_proto_.query_id());
 
     insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
     insert_destination_proto->set_relation_id(result_table_id);
@@ -1291,6 +1296,7 @@ class SortMergeRunOperatorTest : public ::testing::Test {
 
     run_destination_index_ = query_context_proto_.insert_destinations_size();
     insert_destination_proto = query_context_proto_.add_insert_destinations();
+    insert_destination_proto->set_query_id(query_context_proto_.query_id());
 
     insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
     insert_destination_proto->set_relation_id(run_table_id);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 71a80e4..3eeb7e9 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -328,10 +328,12 @@ class SortRunGenerationOperatorTest : public ::testing::Test {
                    const std::vector<bool> &null_ordering) {
     // Setup the InsertDestination proto in the query context proto.
     serialization::QueryContext query_context_proto;
+    query_context_proto.set_query_id(0);  // dummy query ID.
 
     const QueryContext::insert_destination_id insert_destination_index =
         query_context_proto.insert_destinations_size();
     serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+    insert_destination_proto->set_query_id(query_context_proto.query_id());
 
     insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
     insert_destination_proto->set_relation_id(result_table_->getID());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index ef6fc2d..a972a37 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -180,9 +180,11 @@ TEST_F(TextScanOperatorTest, ScanTest) {
 
   // Setup the InsertDestination proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   QueryContext::insert_destination_id output_destination_index = query_context_proto.insert_destinations_size();
   serialization::InsertDestination *output_destination_proto = query_context_proto.add_insert_destinations();
+  output_destination_proto->set_query_id(query_context_proto.query_id());
 
   output_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
   output_destination_proto->set_relation_id(relation_->getID());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 354bed4..5e4dd28 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -60,6 +60,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
                                      const StorageBlockLayout *layout,
                                      StorageManager *storage_manager,
                                      const std::size_t relational_op_index,
+                                     const std::size_t query_id,
                                      const tmb::client_id scheduler_client_id,
                                      tmb::MessageBus *bus)
     : thread_id_map_(*ClientIDMap::Instance()),
@@ -67,6 +68,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
       relation_(relation),
       layout_(layout),
       relational_op_index_(relational_op_index),
+      query_id_(query_id),
       scheduler_client_id_(scheduler_client_id),
       bus_(DCHECK_NOTNULL(bus)) {
   if (layout_ == nullptr) {
@@ -74,11 +76,12 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
   }
 }
 
-InsertDestination* InsertDestination::ReconstructFromProto(const serialization::InsertDestination &proto,
-                                                           const CatalogRelationSchema &relation,
-                                                           StorageManager *storage_manager,
-                                                           const tmb::client_id scheduler_client_id,
-                                                           tmb::MessageBus *bus) {
+InsertDestination* InsertDestination::ReconstructFromProto(
+    const serialization::InsertDestination &proto,
+    const CatalogRelationSchema &relation,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
   DCHECK(ProtoIsValid(proto, relation));
 
   StorageBlockLayout *layout = nullptr;
@@ -93,6 +96,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
                                                     layout,
                                                     storage_manager,
                                                     proto.relational_op_index(),
+                                                    proto.query_id(),
                                                     scheduler_client_id,
                                                     bus);
     }
@@ -107,6 +111,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
                                             storage_manager,
                                             move(blocks),
                                             proto.relational_op_index(),
+                                            proto.query_id(),
                                             scheduler_client_id,
                                             bus);
     }
@@ -134,6 +139,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
           storage_manager,
           move(partitions),
           proto.relational_op_index(),
+          proto.query_id(),
           scheduler_client_id,
           bus);
     }
@@ -262,6 +268,7 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
   serialization::CatalogRelationNewBlockMessage proto;
   proto.set_relation_id(relation_.getID());
   proto.set_block_id(new_id);
+  proto.set_query_id(getQueryID());
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -309,6 +316,7 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
   serialization::CatalogRelationNewBlockMessage proto;
   proto.set_relation_id(relation_.getID());
   proto.set_block_id(new_id);
+  proto.set_query_id(getQueryID());
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -385,21 +393,29 @@ const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInterna
   return done_block_ids_;
 }
 
-PartitionAwareInsertDestination::PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
-                                                                 const CatalogRelationSchema &relation,
-                                                                 const StorageBlockLayout *layout,
-                                                                 StorageManager *storage_manager,
-                                                                 vector<vector<block_id>> &&partitions,
-                                                                 const std::size_t relational_op_index,
-                                                                 const tmb::client_id scheduler_client_id,
-                                                                 tmb::MessageBus *bus)
-    : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+PartitionAwareInsertDestination::PartitionAwareInsertDestination(
+    PartitionSchemeHeader *partition_scheme_header,
+    const CatalogRelationSchema &relation,
+    const StorageBlockLayout *layout,
+    StorageManager *storage_manager,
+    vector<vector<block_id>> &&partitions,
+    const std::size_t relational_op_index,
+    const std::size_t query_id,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus)
+    : InsertDestination(relation,
+                        layout,
+                        storage_manager,
+                        relational_op_index,
+                        query_id,
+                        scheduler_client_id,
+                        bus),
       partition_scheme_header_(DCHECK_NOTNULL(partition_scheme_header)),
       available_block_refs_(partition_scheme_header_->getNumPartitions()),
       available_block_ids_(move(partitions)),
       done_block_ids_(partition_scheme_header_->getNumPartitions()),
-      mutexes_for_partition_(new SpinMutex[partition_scheme_header_->getNumPartitions()]) {
-}
+      mutexes_for_partition_(
+          new SpinMutex[partition_scheme_header_->getNumPartitions()]) {}
 
 MutableBlockReference PartitionAwareInsertDestination::createNewBlock() {
   FATAL_ERROR("PartitionAwareInsertDestination::createNewBlock needs a partition id as an argument.");
@@ -415,6 +431,7 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
   proto.set_relation_id(relation_.getID());
   proto.set_block_id(new_id);
   proto.set_partition_id(part_id);
+  proto.set_query_id(getQueryID());
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 670cd6c..6968149 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -78,6 +78,7 @@ class InsertDestination : public InsertDestinationInterface {
    * @param storage_manager The StorageManager to use.
    * @param relational_op_index The index of the relational operator in the
    *        QueryPlan DAG that has outputs.
+   * @param query_id The ID of this query.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
    * @param bus A pointer to the TMB.
    **/
@@ -85,6 +86,7 @@ class InsertDestination : public InsertDestinationInterface {
                     const StorageBlockLayout *layout,
                     StorageManager *storage_manager,
                     const std::size_t relational_op_index,
+                    const std::size_t query_id,
                     const tmb::client_id scheduler_client_id,
                     tmb::MessageBus *bus);
 
@@ -211,6 +213,7 @@ class InsertDestination : public InsertDestinationInterface {
     proto.set_operator_index(relational_op_index_);
     proto.set_block_id(id);
     proto.set_relation_id(relation_.getID());
+    proto.set_query_id(query_id_);
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();
@@ -253,6 +256,10 @@ class InsertDestination : public InsertDestinationInterface {
         " ID " << scheduler_client_id_;
   }
 
+  inline const std::size_t getQueryID() const {
+    return query_id_;
+  }
+
   const ClientIDMap &thread_id_map_;
 
   StorageManager *storage_manager_;
@@ -260,6 +267,7 @@ class InsertDestination : public InsertDestinationInterface {
 
   std::unique_ptr<const StorageBlockLayout> layout_;
   const std::size_t relational_op_index_;
+  const std::size_t query_id_;
 
   tmb::client_id scheduler_client_id_;
   tmb::MessageBus *bus_;
@@ -288,10 +296,16 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination {
                                      const StorageBlockLayout *layout,
                                      StorageManager *storage_manager,
                                      const std::size_t relational_op_index,
+                                     const std::size_t query_id,
                                      const tmb::client_id scheduler_client_id,
                                      tmb::MessageBus *bus)
-      : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
-  }
+      : InsertDestination(relation,
+                          layout,
+                          storage_manager,
+                          relational_op_index,
+                          query_id,
+                          scheduler_client_id,
+                          bus) {}
 
   ~AlwaysCreateBlockInsertDestination() override {
   }
@@ -334,16 +348,23 @@ class BlockPoolInsertDestination : public InsertDestination {
    * @param relational_op_index The index of the relational operator in the
    *        QueryPlan DAG that has outputs.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
+   * @param query_id The ID of the query.
    * @param bus A pointer to the TMB.
    **/
   BlockPoolInsertDestination(const CatalogRelationSchema &relation,
                              const StorageBlockLayout *layout,
                              StorageManager *storage_manager,
                              const std::size_t relational_op_index,
+                             const std::size_t query_id,
                              const tmb::client_id scheduler_client_id,
                              tmb::MessageBus *bus)
-      : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
-  }
+      : InsertDestination(relation,
+                          layout,
+                          storage_manager,
+                          relational_op_index,
+                          query_id,
+                          scheduler_client_id,
+                          bus) {}
 
   /**
    * @brief Constructor.
@@ -363,9 +384,16 @@ class BlockPoolInsertDestination : public InsertDestination {
                              StorageManager *storage_manager,
                              std::vector<block_id> &&blocks,
                              const std::size_t relational_op_index,
+                             const std::size_t query_id,
                              const tmb::client_id scheduler_client_id,
                              tmb::MessageBus *bus)
-      : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+      : InsertDestination(relation,
+                          layout,
+                          storage_manager,
+                          relational_op_index,
+                          query_id,
+                          scheduler_client_id,
+                          bus),
         available_block_ids_(std::move(blocks)) {
     // TODO(chasseur): Once block fill statistics are available, replace this
     // with something smarter.
@@ -386,7 +414,6 @@ class BlockPoolInsertDestination : public InsertDestination {
   MutableBlockReference createNewBlock() override;
 
  private:
-  FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);
   FRIEND_TEST(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest);
 
   // A vector of references to blocks which are loaded in memory.
@@ -416,17 +443,20 @@ class PartitionAwareInsertDestination : public InsertDestination {
    * @param partitions The blocks in partitions.
    * @param relational_op_index The index of the relational operator in the
    *        QueryPlan DAG that has outputs.
+   * @param query_id The ID of the query.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
    * @param bus A pointer to the TMB.
    **/
-  PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
-                                  const CatalogRelationSchema &relation,
-                                  const StorageBlockLayout *layout,
-                                  StorageManager *storage_manager,
-                                  std::vector<std::vector<block_id>> &&partitions,
-                                  const std::size_t relational_op_index,
-                                  const tmb::client_id scheduler_client_id,
-                                  tmb::MessageBus *bus);
+  PartitionAwareInsertDestination(
+      PartitionSchemeHeader *partition_scheme_header,
+      const CatalogRelationSchema &relation,
+      const StorageBlockLayout *layout,
+      StorageManager *storage_manager,
+      std::vector<std::vector<block_id>> &&partitions,
+      const std::size_t relational_op_index,
+      const std::size_t query_id,
+      const tmb::client_id scheduler_client_id,
+      tmb::MessageBus *bus);
 
   ~PartitionAwareInsertDestination() override {
     delete[] mutexes_for_partition_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/storage/InsertDestination.proto
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.proto b/storage/InsertDestination.proto
index 6083539..a3d8acf 100644
--- a/storage/InsertDestination.proto
+++ b/storage/InsertDestination.proto
@@ -34,6 +34,7 @@ message InsertDestination {
   optional StorageBlockLayoutDescription layout = 3;
 
   required uint64 relational_op_index = 4;
+  required uint64 query_id = 5;
 
   // The convention for extension numbering is that extensions for a particular
   // tInsertDestinationType should begin from (insert_destination_type + 1) * 16.


[3/4] incubator-quickstep git commit: Created QueryManager class and tests.

Posted by hb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
new file mode 100644
index 0000000..1ee1df9
--- /dev/null
+++ b/query_execution/PolicyEnforcer.cpp
@@ -0,0 +1,177 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcer.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManager.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "relational_operators/WorkOrder.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+  if (admitted_queries_.size() < kMaxConcurrentQueries) {
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      admitted_queries_[query_id].reset(
+          new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+                           catalog_database_, storage_manager_, bus_));
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // This query will have to wait.
+    waiting_queries_.push(query_handle);
+    return false;
+  }
+}
+
+void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
+  // TODO(harshad) : Provide processXMessage() public functions in
+  // QueryManager, so that we need to extract message from the
+  // TaggedMessage only once.
+  std::size_t query_id;
+  switch (tagged_message.message_type()) {
+    case kWorkOrderCompleteMessage:  // Fall through.
+    case kRebuildWorkOrderCompleteMessage: {
+      serialization::WorkOrderCompletionMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kCatalogRelationNewBlockMessage: {
+      serialization::CatalogRelationNewBlockMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kDataPipelineMessage: {
+      serialization::DataPipelineMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kWorkOrdersAvailableMessage: {
+      serialization::WorkOrdersAvailableMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kWorkOrderFeedbackMessage: {
+      // TODO(harshad) Add query ID to FeedbackMessage.
+      WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), tagged_message.message_bytes());
+      query_id = msg.header().query_id;
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
+  }
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  const QueryManager::QueryStatusCode return_code =
+      admitted_queries_[query_id]->processMessage(tagged_message);
+  if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
+    removeQuery(query_id);
+    if (!waiting_queries_.empty()) {
+      // Admit the earliest waiting query.
+      QueryHandle *new_query = waiting_queries_.front();
+      waiting_queries_.pop();
+      admitQuery(new_query);
+    }
+  }
+}
+
+void PolicyEnforcer::getWorkerMessages(
+    std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+  // Iterate over admitted queries until either there are no more
+  // messages available, or the maximum number of messages have
+  // been collected.
+  DCHECK(worker_messages->empty());
+  // TODO(harshad) - Make this function generic enough so that it
+  // works well when multiple queries are getting executed.
+  std::size_t per_query_share = 0;
+  if (!admitted_queries_.empty()) {
+    per_query_share = kMaxNumWorkerMessages / admitted_queries_.size();
+  } else {
+    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    return;
+  }
+  DCHECK_GT(per_query_share, 0u);
+  std::vector<std::size_t> finished_queries_ids;
+
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManager *curr_query_manager = admitted_query_info.second.get();
+    DCHECK(curr_query_manager != nullptr);
+    std::size_t messages_collected_curr_query = 0;
+    while (messages_collected_curr_query < per_query_share) {
+      WorkerMessage *next_worker_message =
+          curr_query_manager->getNextWorkerMessage(0, -1);
+      if (next_worker_message != nullptr) {
+        ++messages_collected_curr_query;
+        worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+      } else {
+        // No more work ordes from the current query at this time.
+        // Check if the query's execution is over.
+        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          // If the query has been executed, remove it.
+          finished_queries_ids.push_back(admitted_query_info.first);
+        }
+        break;
+      }
+    }
+  }
+  for (std::size_t finished_qid : finished_queries_ids) {
+    removeQuery(finished_qid);
+  }
+}
+
+void PolicyEnforcer::removeQuery(const std::size_t query_id) {
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+    LOG(WARNING) << "Removing query with ID " << query_id
+                 << " that hasn't finished its execution";
+  }
+  admitted_queries_.erase(query_id);
+}
+
+bool PolicyEnforcer::admitQueries(
+    const std::vector<QueryHandle *> &query_handles) {
+  for (QueryHandle *curr_query : query_handles) {
+    if (!admitQuery(curr_query)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
new file mode 100644
index 0000000..d4ba643
--- /dev/null
+++ b/query_execution/PolicyEnforcer.hpp
@@ -0,0 +1,168 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcer {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param num_numa_nodes Number of NUMA nodes used by the system.
+   * @param catalog_database The CatalogDatabase used.
+   * @param storage_manager The StorageManager used.
+   * @param bus The TMB.
+   **/
+  PolicyEnforcer(const tmb::client_id foreman_client_id,
+                 const std::size_t num_numa_nodes,
+                 CatalogDatabaseLite *catalog_database,
+                 StorageManager *storage_manager,
+                 tmb::MessageBus *bus)
+      : foreman_client_id_(foreman_client_id),
+        num_numa_nodes_(num_numa_nodes),
+        catalog_database_(catalog_database),
+        storage_manager_(storage_manager),
+        bus_(bus) {}
+
+  /**
+   * @brief Destructor.
+   **/
+  ~PolicyEnforcer() {
+    if (hasQueries()) {
+      LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
+                      "waiting queries";
+    }
+  }
+
+  /**
+   * @brief Admit a query to the system.
+   *
+   * @param query_handle The QueryHandle for the new query.
+   *
+   * @return Whether the query was admitted to the system.
+   **/
+  bool admitQuery(QueryHandle *query_handle);
+
+  /**
+   * @brief Admit multiple queries in the system.
+   *
+   * @note In the current simple implementation, we only allow one active
+   *       query in the system. Other queries will have to wait.
+   *
+   * @param query_handles A vector of QueryHandles for the queries to be
+   *        admitted.
+   *
+   * @return True if all the queries were admitted, false if at least one query
+   *         was not admitted.
+   **/
+  bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+  /**
+   * @brief Remove a given query that is under execution.
+   *
+   * @note This function is made public so that it is possible for a query
+   *       to be killed. Otherwise, it should only be used privately by the
+   *       class.
+   *
+   * TODO(harshad) - Extend this function to support removal of waiting queries.
+   *
+   * @param query_id The ID of the query to be removed.
+   **/
+  void removeQuery(const std::size_t query_id);
+
+  /**
+   * @brief Get worker messages to be dispatched. These worker messages come
+   *        from the active queries.
+   *
+   * @param worker_messages The worker messages to be dispatched.
+   **/
+  void getWorkerMessages(
+      std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+  /**
+   * @brief Process a message sent to the Foreman, which gets passed on to the
+   *        policy enforcer.
+   *
+   * @param message The message.
+   **/
+  void processMessage(const TaggedMessage &tagged_message);
+
+  /**
+   * @brief Check if there are any queries to be executed.
+   *
+   * @return True if there is at least one active or waiting query, false if
+   *         the policy enforcer doesn't have any query.
+   **/
+  inline bool hasQueries() const {
+    return !(admitted_queries_.empty() && waiting_queries_.empty());
+  }
+
+ private:
+  static constexpr std::size_t kMaxConcurrentQueries = 1;
+  static constexpr std::size_t kMaxNumWorkerMessages = 20;
+
+  const tmb::client_id foreman_client_id_;
+  const std::size_t num_numa_nodes_;
+
+  CatalogDatabaseLite *catalog_database_;
+  StorageManager *storage_manager_;
+
+  tmb::MessageBus *bus_;
+
+  // Key = query ID, value = QueryManager* for the key query.
+  std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
+
+  // The queries which haven't been admitted yet.
+  std::queue<QueryHandle*> waiting_queries_;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 3bfce17..5c4c44e 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -56,7 +56,8 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
                            const CatalogDatabaseLite &database,
                            StorageManager *storage_manager,
                            const tmb::client_id scheduler_client_id,
-                           tmb::MessageBus *bus) {
+                           tmb::MessageBus *bus)
+    : query_id_(proto.query_id()) {
   DCHECK(ProtoIsValid(proto, database))
       << "Attempted to create QueryContext from an invalid proto description:\n"
       << proto.DebugString();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7d5628d..83627bc 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -460,6 +460,13 @@ class QueryContext {
     return update_groups_[id];
   }
 
+  /**
+   * @param Get the ID of this query.
+   **/
+  inline const std::size_t getQueryID() const {
+    return query_id_;
+  }
+
  private:
   std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
   std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
@@ -472,6 +479,8 @@ class QueryContext {
   std::vector<std::unique_ptr<Tuple>> tuples_;
   std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> update_groups_;
 
+  const std::size_t query_id_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryContext);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index b37286c..98cd0b6 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -54,4 +54,6 @@ message QueryContext {
 
   // NOTE(zuyu): For UpdateWorkOrder only.
   repeated UpdateGroup update_groups = 10;
+
+  required uint64 query_id = 11;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 15803cf..9d9a9e5 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -27,6 +27,7 @@ message EmptyMessage {
 message WorkOrderCompletionMessage {
   required uint64 operator_index = 1;
   required uint64 worker_thread_index = 2;
+  required uint64 query_id = 3;
 }
 
 message CatalogRelationNewBlockMessage {
@@ -35,16 +36,19 @@ message CatalogRelationNewBlockMessage {
 
   // Used by PartitionAwareInsertDestination.
   optional uint64 partition_id = 3;
+  required uint64 query_id = 4;
 }
 
 message DataPipelineMessage {
   required uint64 operator_index = 1;
   required fixed64 block_id = 2;
   required int32 relation_id = 3;
+  required uint64 query_id = 4;
 }
 
 message WorkOrdersAvailableMessage {
   required uint64 operator_index = 1;
+  required uint64 query_id = 2;
 }
 
 // BlockLocator related messages.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fc253bc..fa97158 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -58,6 +58,7 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
 
 // We sort the following message types in the order of a life cycle of a query.
 enum QueryExecutionMessageType : message_type_id {
+  kAdmitRequestMessage,  // Requesting a query (or queries) to be admitted.
   kWorkOrderMessage,  // From Foreman to Worker.
   kWorkOrderCompleteMessage,  // From Worker to Foreman.
   kCatalogRelationNewBlockMessage,  // From InsertDestination to Foreman.
@@ -67,7 +68,8 @@ enum QueryExecutionMessageType : message_type_id {
                               // their corresponding RelationalOperators.
   kRebuildWorkOrderMessage,  // From Foreman to Worker.
   kRebuildWorkOrderCompleteMessage,  // From Worker to Foreman.
-  kPoisonMessage,  // From the CLI shell to Foreman, then from Foreman to Workers.
+  kWorkloadCompletionMessage,  // From Foreman to main thread.
+  kPoisonMessage,  // From the main thread to Foreman and workers.
 
 #ifdef QUICKSTEP_DISTRIBUTED
   // BlockLocator related messages, sorted in a life cycle of StorageManager

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index a8b6a38..78fd159 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -17,9 +17,12 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
 
+#include <memory>
 #include <utility>
 
+#include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkerMessage.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/address.h"
@@ -60,6 +63,55 @@ class QueryExecutionUtil {
                      std::move(tagged_message));
   }
 
+  /**
+   * @brief Construct and send an AdmitRequestMessage from a given sender to a
+   *        given recipient.
+   *
+   * @param sender_id The TMB client ID of the sender.
+   * @param receiver_id The TMB client ID of the receiver.
+   * @param query_handle The QueryHandle used in the AdmitRequestMessage.
+   * @param bus A pointer to the TMB.
+   * @param tagged_message A moved from reference to the tagged message.
+   *
+   * @return A status code indicating the result of the message delivery.
+   *         The caller should ensure that the status is SendStatus::kOK.
+   **/
+  static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
+      const tmb::client_id sender_id,
+      const tmb::client_id receiver_id,
+      QueryHandle *query_handle,
+      MessageBus *bus) {
+    std::unique_ptr<AdmitRequestMessage> request_message(
+        new AdmitRequestMessage(query_handle));
+    const std::size_t size_of_request_msg = sizeof(*request_message);
+    TaggedMessage admit_tagged_message(
+        request_message.release(), size_of_request_msg, kAdmitRequestMessage);
+
+    return QueryExecutionUtil::SendTMBMessage(
+        bus, sender_id, receiver_id, std::move(admit_tagged_message));
+  }
+
+  static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
+    // Terminate all threads.
+    // The sender thread broadcasts poison message to the workers and foreman.
+    // Each worker dies after receiving poison message. The order of workers'
+    // death is irrelavant.
+    MessageStyle style;
+    style.Broadcast(true);
+    Address address;
+    address.All(true);
+    std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
+    TaggedMessage poison_tagged_message(poison_message.get(),
+                                        sizeof(*poison_message),
+                                        kPoisonMessage);
+
+    const tmb::MessageBus::SendStatus send_status = bus->Send(
+        sender_id, address, style, std::move(poison_tagged_message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
+       "Broadcast poison message from sender with TMB client ID " << sender_id
+       << " failed";
+  }
+
  private:
   /**
    * @brief Constructor. Made private to avoid instantiation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/QueryManager.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.hpp b/query_execution/QueryManager.hpp
index 47f54c5..b52460f 100644
--- a/query_execution/QueryManager.hpp
+++ b/query_execution/QueryManager.hpp
@@ -25,18 +25,21 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/DAG.hpp"
 #include "utility/Macros.hpp"
 
+#include "tmb/id_typedefs.h"
 #include "tmb/message_bus.h"
 #include "tmb/tagged_message.h"
 
 namespace quickstep {
 
 class CatalogDatabaseLite;
-class ForemanMessage;
 class QueryHandle;
 class StorageManager;
 class WorkerMessage;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
index eb9aedd..6ed6474 100644
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@ -28,6 +28,8 @@
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
 
+#include "glog/logging.h"
+
 namespace quickstep {
 
 /** \addtogroup QueryExecution
@@ -76,7 +78,7 @@ class WorkOrdersContainer {
    * @return If there are pending WorkOrders.
    **/
   inline bool hasNormalWorkOrder(const std::size_t operator_index) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(operator_index < num_operators_);
     return normal_workorders_[operator_index].hasWorkOrder();
   }
 
@@ -92,9 +94,9 @@ class WorkOrdersContainer {
    **/
   inline bool hasNormalWorkOrderForNUMANode(
       const std::size_t operator_index, const int numa_node_id) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK(operator_index < num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
     return normal_workorders_[operator_index].hasWorkOrderForNUMANode(
         numa_node_id);
   }
@@ -108,7 +110,7 @@ class WorkOrdersContainer {
    * @return If there are pending rebuild WorkOrders.
    **/
   inline bool hasRebuildWorkOrder(const std::size_t operator_index) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(operator_index < num_operators_);
     return rebuild_workorders_[operator_index].hasWorkOrder();
   }
 
@@ -124,9 +126,9 @@ class WorkOrdersContainer {
    **/
   inline bool hasRebuildWorkOrderForNUMANode(
       const std::size_t operator_index, const int numa_node_id) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK(operator_index < num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
     return rebuild_workorders_[operator_index].hasWorkOrderForNUMANode(
         numa_node_id);
   }
@@ -144,9 +146,9 @@ class WorkOrdersContainer {
    **/
   WorkOrder* getNormalWorkOrderForNUMANode(const std::size_t operator_index,
                                            const int numa_node_id) {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK(operator_index < num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
     return normal_workorders_[operator_index].getWorkOrderForNUMANode(
         numa_node_id);
   }
@@ -164,7 +166,7 @@ class WorkOrdersContainer {
    **/
   WorkOrder* getNormalWorkOrder(const std::size_t operator_index,
                                 const bool prefer_single_NUMA_node = true) {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(operator_index < num_operators_);
     return normal_workorders_[operator_index].getWorkOrder(
         prefer_single_NUMA_node);
   }
@@ -182,9 +184,9 @@ class WorkOrdersContainer {
    **/
   WorkOrder* getRebuildWorkOrderForNUMANode(const std::size_t operator_index,
                                             const int numa_node_id) {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK(operator_index < num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
     return rebuild_workorders_[operator_index].getWorkOrderForNUMANode(
         numa_node_id);
   }
@@ -202,7 +204,7 @@ class WorkOrdersContainer {
    **/
   WorkOrder* getRebuildWorkOrder(const std::size_t operator_index,
                                  const bool prefer_single_NUMA_node = true) {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(operator_index < num_operators_);
     return rebuild_workorders_[operator_index].getWorkOrder(
         prefer_single_NUMA_node);
   }
@@ -220,8 +222,8 @@ class WorkOrdersContainer {
    * @param operator_index The index of the operator in the query DAG.
    **/
   void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
-    DEBUG_ASSERT(workorder != nullptr);
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(workorder != nullptr);
+    DCHECK(operator_index < num_operators_);
     normal_workorders_[operator_index].addWorkOrder(workorder);
   }
 
@@ -238,8 +240,8 @@ class WorkOrdersContainer {
    **/
   void addRebuildWorkOrder(WorkOrder *workorder,
                            const std::size_t operator_index) {
-    DEBUG_ASSERT(workorder != nullptr);
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(workorder != nullptr);
+    DCHECK(operator_index < num_operators_);
     rebuild_workorders_[operator_index].addWorkOrder(workorder);
   }
 
@@ -254,9 +256,9 @@ class WorkOrdersContainer {
    **/
   inline std::size_t getNumNormalWorkOrdersForNUMANode(
       const std::size_t operator_index, const int numa_node_id) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK(operator_index < num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
     return normal_workorders_[operator_index].getNumWorkOrdersForNUMANode(
         numa_node_id);
   }
@@ -271,7 +273,7 @@ class WorkOrdersContainer {
    **/
   inline std::size_t getNumNormalWorkOrders(
       const std::size_t operator_index) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(operator_index < num_operators_);
     return normal_workorders_[operator_index].getNumWorkOrders();
   }
 
@@ -286,9 +288,9 @@ class WorkOrdersContainer {
    **/
   inline std::size_t getNumRebuildWorkOrdersForNUMANode(
       const std::size_t operator_index, const int numa_node_id) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK(operator_index < num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
     return rebuild_workorders_[operator_index].getNumWorkOrdersForNUMANode(
         numa_node_id);
   }
@@ -303,7 +305,7 @@ class WorkOrdersContainer {
    **/
   inline std::size_t getNumRebuildWorkOrders(
       const std::size_t operator_index) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(operator_index < num_operators_);
     return rebuild_workorders_[operator_index].getNumWorkOrders();
   }
 
@@ -418,8 +420,8 @@ class WorkOrdersContainer {
     void addWorkOrder(WorkOrder *workorder);
 
     bool hasWorkOrderForNUMANode(const int numa_node_id) const {
-      DEBUG_ASSERT(numa_node_id >= 0);
-      DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+      DCHECK_GE(numa_node_id, 0);
+      DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
       return single_numa_node_workorders_[numa_node_id].hasWorkOrder() ||
              multiple_numa_nodes_workorders_.hasWorkOrderForNUMANode(
                  numa_node_id);
@@ -440,8 +442,8 @@ class WorkOrdersContainer {
 
     std::size_t getNumWorkOrdersForNUMANode(
         const int numa_node_id) const {
-      DEBUG_ASSERT(numa_node_id >= 0);
-      DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+      DCHECK_GE(numa_node_id, 0);
+      DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
       return single_numa_node_workorders_[numa_node_id].getNumWorkOrders() +
              multiple_numa_nodes_workorders_.getNumWorkOrdersForNUMANode(
                  numa_node_id);
@@ -463,8 +465,8 @@ class WorkOrdersContainer {
     }
 
     WorkOrder* getWorkOrderForNUMANode(const int numa_node_id) {
-      DEBUG_ASSERT(numa_node_id >= 0);
-      DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+      DCHECK_GE(numa_node_id, 0);
+      DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
       WorkOrder *work_order = single_numa_node_workorders_[numa_node_id].getWorkOrder();
       if (work_order == nullptr) {
         work_order = multiple_numa_nodes_workorders_.getWorkOrderForNUMANode(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 645fd05..ef596e1 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -58,11 +58,14 @@ void Worker::run() {
         WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
         DCHECK(message.getWorkOrder() != nullptr);
         message.getWorkOrder()->execute();
+        const std::size_t query_id_for_workorder =
+            message.getWorkOrder()->getQueryID();
         delete message.getWorkOrder();
 
-        sendWorkOrderCompleteMessage(annotated_msg.sender,
-                                     message.getRelationalOpIndex(),
-                                     tagged_message.message_type() == kRebuildWorkOrderMessage);
+        sendWorkOrderCompleteMessage(
+            annotated_msg.sender, message.getRelationalOpIndex(),
+            query_id_for_workorder,
+            tagged_message.message_type() == kRebuildWorkOrderMessage);
         break;
       }
       case kPoisonMessage: {
@@ -76,10 +79,12 @@ void Worker::run() {
 
 void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
                                           const size_t op_index,
+                                          const size_t query_id,
                                           const bool is_rebuild_work_order) {
   serialization::WorkOrderCompletionMessage proto;
   proto.set_operator_index(op_index);
   proto.set_worker_thread_index(worker_thread_index_);
+  proto.set_query_id(query_id);
 
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index b94e937..c0bafdc 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -97,11 +97,13 @@ class Worker : public Thread {
    *
    * @param receiver The id of the TMB client which should receive the response.
    * @param op_index The index of the operator to which the WorkOrder belongs.
+   * @param query_id The ID of the query which the WorkOrder belongs to.
    * @param is_rebuild_work_order True if it is a RebuildWorkOrder. Otherwise
    *        false.
    **/
   void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
                                     const std::size_t op_index,
+                                    const std::size_t query_id,
                                     const bool is_rebuild_work_order);
 
   const std::size_t worker_thread_index_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/WorkerMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp
index ec63af9..7adf2d3 100644
--- a/query_execution/WorkerMessage.hpp
+++ b/query_execution/WorkerMessage.hpp
@@ -105,6 +105,23 @@ class WorkerMessage {
     return type_;
   }
 
+  /**
+   * @brief Set a hint for the recipient worker thread.
+   *
+   * @param recipient_index_hint The hint i.e. the worker thread index.
+   **/
+  inline void setRecipientHint(const int recipient_index_hint) {
+    recipient_index_hint_ = recipient_index_hint;
+  }
+
+  /**
+   * @brief Get the hint for the recipient worker thread. The hint is invalid if
+   *        it is -1.
+   **/
+  inline int getRecipientHint() const {
+    return recipient_index_hint_;
+  }
+
  private:
   /**
    * @brief Constructor.
@@ -120,12 +137,13 @@ class WorkerMessage {
                 const WorkerMessageType type)
       : work_unit_(work_unit),
         relational_op_index_(relational_op_index),
-        type_(type) {
-  }
+        type_(type),
+        recipient_index_hint_(-1) {}
 
   WorkOrder *work_unit_;
   const std::size_t relational_op_index_;
   const WorkerMessageType type_;
+  int recipient_index_hint_;
 };
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
index 308d5ca..62642ac 100644
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ b/query_execution/tests/QueryManager_unittest.cpp
@@ -250,6 +250,7 @@ class QueryManagerTest : public ::testing::Test {
 
     proto.set_block_id(0);  // dummy block ID
     proto.set_relation_id(0);  // dummy relation ID.
+    proto.set_query_id(0);  // dummy query ID.
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();
@@ -270,6 +271,7 @@ class QueryManagerTest : public ::testing::Test {
     serialization::WorkOrderCompletionMessage proto;
     proto.set_operator_index(index);
     proto.set_worker_thread_index(1);  // dummy worker ID.
+    proto.set_query_id(0);  // dummy query ID.
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const size_t proto_length = proto.ByteSize();
@@ -291,6 +293,7 @@ class QueryManagerTest : public ::testing::Test {
     serialization::WorkOrderCompletionMessage proto;
     proto.set_operator_index(index);
     proto.set_worker_thread_index(1);  // dummy worker thread ID.
+    proto.set_query_id(0);  // dummy query ID.
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const size_t proto_length = proto.ByteSize();
@@ -314,6 +317,7 @@ class QueryManagerTest : public ::testing::Test {
 
     proto.set_block_id(0);  // dummy block ID
     proto.set_relation_id(0);  // dummy relation ID.
+    proto.set_query_id(0);  // dummy query ID.
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();
@@ -712,11 +716,13 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
   // Setup the InsertDestination proto in the query context proto.
   serialization::QueryContext *query_context_proto =
       query_handle_->getQueryContextProtoMutable();
+  query_context_proto->set_query_id(0);  // dummy query ID.
 
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto->insert_destinations_size();
   serialization::InsertDestination *insert_destination_proto =
       query_context_proto->add_insert_destinations();
+  insert_destination_proto->set_query_id(query_context_proto->query_id());
 
   insert_destination_proto->set_insert_destination_type(
       serialization::InsertDestinationType::BLOCK_POOL);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_execution/tests/WorkOrdersContainer_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/WorkOrdersContainer_unittest.cpp b/query_execution/tests/WorkOrdersContainer_unittest.cpp
index cf133c4..cb583ab 100644
--- a/query_execution/tests/WorkOrdersContainer_unittest.cpp
+++ b/query_execution/tests/WorkOrdersContainer_unittest.cpp
@@ -72,6 +72,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) {
   // they get inserted and retrieved correctly.
   std::vector<int> numa_node_ids;
   // A container for one operator and no NUMA nodes.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, 0);
 
   EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
@@ -104,11 +105,15 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) {
   ASSERT_TRUE(returned_work_order != nullptr);
   EXPECT_EQ(work_order.getID(), static_cast<MockNUMAWorkOrder*>(returned_work_order)->getID());
 
+  EXPECT_EQ(query_id, returned_work_order->getQueryID());
+
   WorkOrder *returned_rebuild_work_order = w.getRebuildWorkOrder(0);
   ASSERT_TRUE(returned_rebuild_work_order != nullptr);
   EXPECT_EQ(work_order1.getID(),
             static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
 
+  EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
+
   // Container should be empty now.
   EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
   EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0));
@@ -123,6 +128,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) {
   // if they get inserted and retrieved correctly and the order of retrieval.
   // A container for one operator and no NUMA nodes.
   std::vector<int> numa_node_ids;
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, 0);
 
   EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
@@ -164,6 +170,8 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) {
     ASSERT_TRUE(returned_work_order != nullptr);
     EXPECT_EQ(static_cast<int>(kNumWorkOrders + i),
               static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+    EXPECT_EQ(query_id, returned_work_order->getQueryID());
+    EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
   }
 
   // Container should be empty now.
@@ -190,6 +198,7 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) {
   const std::size_t kNUMANodesUsed = numa_node_ids.size();
 
   // A container for one operator and kNUMANodes.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, kNUMANodes);
 
   for (std::size_t i = 0; i < kNUMANodesUsed; ++i) {
@@ -246,6 +255,9 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) {
     ASSERT_TRUE(returned_rebuild_work_order != nullptr);
     EXPECT_EQ(rebuild_workorders[i].getID(),
               static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+
+    EXPECT_EQ(query_id, returned_work_order->getQueryID());
+    EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
   }
 
   // No workorder should be left for this operator on any NUMA node.
@@ -291,6 +303,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
   const std::size_t kNUMANodesUsed = numa_nodes.size();
 
   // Create the container.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, kNUMANodes);
 
   w.addNormalWorkOrder(&multiple_numa_work_order, 0);
@@ -331,6 +344,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
       w.getNormalWorkOrderForNUMANode(0, numa_nodes[0]));
   ASSERT_TRUE(observed_work_order != nullptr);
 
+  EXPECT_EQ(query_id, observed_work_order->getQueryID());
   EXPECT_EQ(one_numa_work_order.getPreferredNUMANodes().front(),
             observed_work_order->getPreferredNUMANodes().front());
   EXPECT_EQ(one_numa_work_order.getID(), observed_work_order->getID());
@@ -348,6 +362,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
   EXPECT_EQ(no_numa_work_order.getID(),
             static_cast<MockNUMAWorkOrder *>(observed_non_numa_work_order)->getID());
 
+  EXPECT_EQ(query_id, observed_non_numa_work_order->getQueryID());
   EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[0]));
   EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[1]));
 
@@ -361,6 +376,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
   ASSERT_TRUE(observed_work_order_multiple_numa_nodes != nullptr);
   EXPECT_EQ(multiple_numa_work_order.getID(), observed_work_order_multiple_numa_nodes->getID());
 
+  EXPECT_EQ(query_id, observed_work_order_multiple_numa_nodes->getQueryID());
   std::vector<int> observed_numa_nodes(
       observed_work_order_multiple_numa_nodes->getPreferredNUMANodes());
   // Look up the expected numa nodes in the observed_numa_nodes vector.
@@ -427,6 +443,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
   const std::size_t kNUMANodes = numa_node_ids.size();
 
   // Create the container.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(kNumOperators, kNUMANodes);
 
   std::vector<std::size_t> operator_ids;
@@ -538,6 +555,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
             curr_operator_id, single_numa_node_id));
     ASSERT_TRUE(observed_work_order_single_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_single_numa =
         normal_workorders_one_numa_ids[curr_operator_id];
@@ -550,6 +568,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
             curr_operator_id, multiple_numa_node_id));
     ASSERT_TRUE(observed_work_order_multiple_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_multiple_numa =
         normal_workorders_multiple_numa_ids[curr_operator_id];
@@ -562,6 +581,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
         static_cast<MockNUMAWorkOrder *>(w.getNormalWorkOrder(curr_operator_id));
     ASSERT_TRUE(observed_work_order_no_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_no_numa =
         normal_workorders_no_numa_ids[curr_operator_id];
@@ -620,6 +640,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
   const std::size_t kNUMANodes = numa_node_ids.size();
 
   // Create the container.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(kNumOperators, kNUMANodes);
 
   std::vector<std::size_t> operator_ids;
@@ -732,6 +753,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
             curr_operator_id, single_numa_node_id));
     ASSERT_TRUE(observed_work_order_single_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_single_numa =
         rebuild_workorders_one_numa_ids[curr_operator_id];
@@ -744,6 +766,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
             curr_operator_id, multiple_numa_node_id));
     ASSERT_TRUE(observed_work_order_multiple_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_multiple_numa =
         rebuild_workorders_multiple_numa_ids[curr_operator_id];
@@ -755,6 +778,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
     MockNUMAWorkOrder *observed_work_order_no_numa =
         static_cast<MockNUMAWorkOrder *>(w.getRebuildWorkOrder(curr_operator_id));
 
+    EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_no_numa =
         rebuild_workorders_no_numa_ids[curr_operator_id];
@@ -772,6 +796,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) {
   numa_node_ids.push_back(0);
   const std::size_t kNumWorkOrdersPerType = 100;
 
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, 2);
 
   std::vector<int> single_numa_node_workorder_ids;
@@ -820,6 +845,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) {
     MockNUMAWorkOrder *observed_work_order = static_cast<MockNUMAWorkOrder *>(
         w.getNormalWorkOrder(0, prefer_single_NUMA_node));
     ASSERT_TRUE(observed_work_order != nullptr);
+    EXPECT_EQ(query_id, observed_work_order->getQueryID());
     if (prefer_single_NUMA_node) {
       EXPECT_EQ(*single_numa_it, observed_work_order->getID());
       EXPECT_EQ(1u, observed_work_order->getPreferredNUMANodes().size());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 99c2a21..f9595c9 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -401,7 +401,7 @@ void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) {
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto_->insert_destinations_size();
   S::InsertDestination *insert_destination_proto =
-      query_context_proto_->add_insert_destinations();
+      addNewInsertDestinationToQueryContext();
   createTemporaryCatalogRelation(physical_sample,
                                  &output_relation,
                                  insert_destination_proto);
@@ -513,7 +513,7 @@ void ExecutionGenerator::convertSelection(
   const CatalogRelation *output_relation = nullptr;
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto_->insert_destinations_size();
-  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
   createTemporaryCatalogRelation(physical_selection,
                                  &output_relation,
                                  insert_destination_proto);
@@ -753,7 +753,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   const CatalogRelation *output_relation = nullptr;
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto_->insert_destinations_size();
-  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
   createTemporaryCatalogRelation(physical_plan,
                                  &output_relation,
                                  insert_destination_proto);
@@ -879,7 +879,7 @@ void ExecutionGenerator::convertNestedLoopsJoin(
   const CatalogRelation *output_relation = nullptr;
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto_->insert_destinations_size();
-  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
   createTemporaryCatalogRelation(physical_plan,
                                  &output_relation,
                                  insert_destination_proto);
@@ -926,7 +926,7 @@ void ExecutionGenerator::convertCopyFrom(
   // Create InsertDestination proto.
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto_->insert_destinations_size();
-  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
 
   insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
   insert_destination_proto->set_relation_id(output_relation->getID());
@@ -1153,7 +1153,7 @@ void ExecutionGenerator::convertInsertTuple(
   // Create InsertDestination proto.
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto_->insert_destinations_size();
-  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
 
   insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
   insert_destination_proto->set_relation_id(input_relation.getID());
@@ -1210,7 +1210,7 @@ void ExecutionGenerator::convertInsertSelection(
   // Create InsertDestination proto.
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto_->insert_destinations_size();
-  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
 
   insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
   insert_destination_proto->set_relation_id(destination_relation.getID());
@@ -1281,7 +1281,7 @@ void ExecutionGenerator::convertUpdateTable(
   // Create InsertDestination proto.
   const QueryContext::insert_destination_id relocation_destination_index =
       query_context_proto_->insert_destinations_size();
-  S::InsertDestination *relocation_destination_proto = query_context_proto_->add_insert_destinations();
+  S::InsertDestination *relocation_destination_proto = addNewInsertDestinationToQueryContext();
 
   relocation_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
   relocation_destination_proto->set_relation_id(input_rel_id);
@@ -1447,7 +1447,7 @@ void ExecutionGenerator::convertAggregate(
   const CatalogRelation *output_relation = nullptr;
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto_->insert_destinations_size();
-  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
   createTemporaryCatalogRelation(physical_plan,
                                  &output_relation,
                                  insert_destination_proto);
@@ -1497,7 +1497,7 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
   const QueryContext::insert_destination_id initial_runs_destination_id =
       query_context_proto_->insert_destinations_size();
   S::InsertDestination *initial_runs_destination_proto =
-      query_context_proto_->add_insert_destinations();
+      addNewInsertDestinationToQueryContext();
   createTemporaryCatalogRelation(
       physical_sort, &initial_runs_relation, initial_runs_destination_proto);
 
@@ -1543,7 +1543,7 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
   const QueryContext::insert_destination_id merged_runs_destination_id =
     query_context_proto_->insert_destinations_size();
   S::InsertDestination *merged_runs_destination_proto =
-    query_context_proto_->add_insert_destinations();
+    addNewInsertDestinationToQueryContext();
   createTemporaryCatalogRelation(physical_sort,
                                  &merged_runs_relation,
                                  merged_runs_destination_proto);
@@ -1551,7 +1551,7 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
   const QueryContext::insert_destination_id sorted_output_destination_id =
     query_context_proto_->insert_destinations_size();
   S::InsertDestination *sorted_output_destination_proto =
-    query_context_proto_->add_insert_destinations();
+    addNewInsertDestinationToQueryContext();
   createTemporaryCatalogRelation(physical_sort,
                                  &sorted_relation,
                                  sorted_output_destination_proto);
@@ -1607,7 +1607,7 @@ void ExecutionGenerator::convertTableGenerator(
   const QueryContext::insert_destination_id insert_destination_index =
       query_context_proto_->insert_destinations_size();
   S::InsertDestination *insert_destination_proto =
-      query_context_proto_->add_insert_destinations();
+      addNewInsertDestinationToQueryContext();
   createTemporaryCatalogRelation(physical_tablegen,
                                  &output_relation,
                                  insert_destination_proto);
@@ -1636,5 +1636,11 @@ void ExecutionGenerator::convertTableGenerator(
   temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation);
 }
 
+S::InsertDestination* ExecutionGenerator::addNewInsertDestinationToQueryContext() {
+  S::InsertDestination *insert_destination_proto(query_context_proto_->add_insert_destinations());
+  insert_destination_proto->set_query_id(query_context_proto_->query_id());
+  return insert_destination_proto;
+}
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 0630bca..219fd7a 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -374,6 +374,14 @@ class ExecutionGenerator {
    */
   void dropAllTemporaryRelations();
 
+  /**
+   * @brief Add a new InsertDesetination to the QueryContext and set the
+   *        query ID for the InsertDestination.
+   *
+   * @return A pointer to the serialized InsertDestination.
+   **/
+  serialization::InsertDestination* addNewInsertDestinationToQueryContext();
+
   OptimizerContext *optimizer_context_;
   QueryHandle *query_handle_;
   QueryPlan *execution_plan_;  // A part of QueryHandle.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 07af404..c119be9 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -110,9 +110,11 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
                       quickstep_cli_PrintToScreen
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
+                      quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_Worker
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 56b53ba..930087a 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -24,7 +24,9 @@
 #include "cli/DropRelation.hpp"
 #include "cli/PrintToScreen.hpp"
 #include "parser/ParseStatement.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/Foreman.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_optimizer/ExecutionGenerator.hpp"
 #include "query_optimizer/LogicalGenerator.hpp"
@@ -40,6 +42,8 @@
 
 #include "glog/logging.h"
 
+#include "tmb/tagged_message.h"
+
 namespace quickstep {
 
 class CatalogRelation;
@@ -90,13 +94,20 @@ void ExecutionGeneratorTestRunner::runTestCase(
             physical_generator.generatePlan(
                 logical_generator.generatePlan(*result.parsed_statement));
         execution_generator.generatePlan(physical_plan);
-        foreman_->setQueryPlan(
-            query_handle.getQueryPlanMutable()->getQueryPlanDAGMutable());
-
-        foreman_->reconstructQueryContextFromProto(query_handle.getQueryContextProto());
 
-        foreman_->start();
-        foreman_->join();
+        AdmitRequestMessage request_message(&query_handle);
+        TaggedMessage admit_tagged_message(
+            &request_message, sizeof(request_message), kAdmitRequestMessage);
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           main_thread_client_id_,
+                                           foreman_->getBusClientID(),
+                                           std::move(admit_tagged_message));
+
+        // Receive workload completion message from Foreman.
+        const AnnotatedMessage annotated_msg =
+            bus_.Receive(main_thread_client_id_, 0, true);
+        const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+        DCHECK(tagged_message.message_type() == kWorkloadCompletionMessage);
 
         const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
         if (query_result_relation) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
index 8352d55..9204073 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
@@ -61,9 +61,11 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
     bus_.Initialize();
 
-    foreman_.reset(new Foreman(&bus_,
-                               test_database_loader_.catalog_database(),
-                               test_database_loader_.storage_manager()));
+    main_thread_client_id_ = bus_.Connect();
+    bus_.RegisterClientAsSender(main_thread_client_id_, kAdmitRequestMessage);
+    bus_.RegisterClientAsSender(main_thread_client_id_, kPoisonMessage);
+    bus_.RegisterClientAsReceiver(main_thread_client_id_, kWorkloadCompletionMessage);
+
     worker_.reset(new Worker(0, &bus_));
 
     std::vector<client_id> worker_client_ids;
@@ -75,27 +77,20 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
     workers_.reset(new WorkerDirectory(1 /* number of workers */,
                                        worker_client_ids, numa_nodes));
-    foreman_->setWorkerDirectory(workers_.get());
+    foreman_.reset(new Foreman(main_thread_client_id_,
+                               workers_.get(),
+                               &bus_,
+                               test_database_loader_.catalog_database(),
+                               test_database_loader_.storage_manager()));
 
+    foreman_->start();
     worker_->start();
   }
 
   ~ExecutionGeneratorTestRunner() {
-    std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
-    TaggedMessage poison_tagged_message(poison_message.get(),
-                                        sizeof(*poison_message),
-                                        quickstep::kPoisonMessage);
-
-    Address worker_address;
-    MessageStyle single_receiver_style;
-
-    worker_address.AddRecipient(worker_->getBusClientID());
-    bus_.Send(foreman_->getBusClientID(),
-              worker_address,
-              single_receiver_style,
-              std::move(poison_tagged_message));
-
+    QueryExecutionUtil::BroadcastPoisonMessage(main_thread_client_id_, &bus_);
     worker_->join();
+    foreman_->join();
   }
 
   void runTestCase(const std::string &input,
@@ -112,6 +107,8 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
   std::unique_ptr<WorkerDirectory> workers_;
 
+  tmb::client_id main_thread_client_id_;
+
   // This map is needed for InsertDestination and some operators that send
   // messages to Foreman directly. To know the reason behind the design of this
   // map, see the note in InsertDestination.hpp.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/query_optimizer/tests/TestDatabaseLoader.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/TestDatabaseLoader.cpp b/query_optimizer/tests/TestDatabaseLoader.cpp
index 2de69b6..764ff2f 100644
--- a/query_optimizer/tests/TestDatabaseLoader.cpp
+++ b/query_optimizer/tests/TestDatabaseLoader.cpp
@@ -122,6 +122,7 @@ void TestDatabaseLoader::loadTestRelation() {
                                          nullptr,
                                          &storage_manager_,
                                          0 /* dummy op index */,
+                                         0,  // dummy query ID.
                                          scheduler_client_id_,
                                          &bus_);
   int sign = 1;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index f340d4e..6a8590a 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -111,6 +111,8 @@ class AggregationWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of this query.
    * @param input_block_id The block id.
+   * @param query_id The ID of this query.
+   * @param input_block_id The block id.
    * @param state The AggregationState to use.
    **/
   AggregationWorkOrder(const std::size_t query_id,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 15dc9e3..ec3bc20 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -96,6 +96,7 @@ void DeleteWorkOrder::execute() {
   proto.set_operator_index(delete_operator_index_);
   proto.set_block_id(input_block_id_);
   proto.set_relation_id(input_relation_.getID());
+  proto.set_query_id(query_id_);
 
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const std::size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index c55f585..fdc9b00 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -162,6 +162,7 @@ class DeleteWorkOrder : public WorkOrder {
   StorageManager *storage_manager_;
 
   const std::size_t delete_operator_index_;
+
   const tmb::client_id scheduler_client_id_;
   MessageBus *bus_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/DestroyHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyHashOperator.hpp b/relational_operators/DestroyHashOperator.hpp
index 7d8acb7..b7fe1ac 100644
--- a/relational_operators/DestroyHashOperator.hpp
+++ b/relational_operators/DestroyHashOperator.hpp
@@ -48,6 +48,8 @@ class DestroyHashOperator : public RelationalOperator {
    *
    * @param query_id The ID of the query to which this operator belongs.
    * @param hash_table_index The index of the JoinHashTable in QueryContext.
+   * @param query_id The ID of the query to which this operator belongs.
+   * @param hash_table_index The index of the JoinHashTable in QueryContext.
    **/
   DestroyHashOperator(const std::size_t query_id,
                       const QueryContext::join_hash_table_id hash_table_index)
@@ -80,6 +82,8 @@ class DestroyHashWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param hash_table_index The index of the JoinHashTable in QueryContext.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
+   * @param hash_table_index The index of the JoinHashTable in QueryContext.
    * @param query_context The QueryContext to use.
    **/
   DestroyHashWorkOrder(const std::size_t query_id,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/DropTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.hpp b/relational_operators/DropTableOperator.hpp
index a0a8d6e..0cdb733 100644
--- a/relational_operators/DropTableOperator.hpp
+++ b/relational_operators/DropTableOperator.hpp
@@ -57,6 +57,8 @@ class DropTableOperator : public RelationalOperator {
    *
    * @param query_id The ID of the query to which this operator belongs.
    * @param relation The relation to drop.
+   * @param query_id The ID of the query to which this operator belongs.
+   * @param relation The relation to drop.
    * @param database The databse where to drop \c relation.
    * @param only_drop_blocks If true, only drop the blocks belonging to \c
    *        relation, but leave \c relation in \c database.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 1d5d4e3..d2d3e74 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -631,6 +631,7 @@ class HashOuterJoinWorkOrder : public WorkOrder {
    *        is using attributes from the build relation as input. Note that the
    *        length of this vector should equal the length of \p selection.
    * @param lookup_block_id The block id of the probe_relation.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
    **/

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index 86f8eaf..3125447 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -85,6 +85,7 @@ class RebuildWorkOrder : public WorkOrder {
     proto.set_operator_index(input_operator_index_);
     proto.set_block_id(block_ref_->getID());
     proto.set_relation_id(input_relation_id_);
+    proto.set_query_id(query_id_);
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index 6bf5719..40fde13 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -259,6 +259,7 @@ void SortMergeRunWorkOrder::execute() {
 
   // Send completion message to operator.
   FeedbackMessage msg(SortMergeRunOperator::kRunOutputMessage,
+                      getQueryID(),
                       operator_index_,
                       serialized_output.first,
                       serialized_output.second);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/TableGeneratorOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.hpp b/relational_operators/TableGeneratorOperator.hpp
index 6a6af4b..c8adb9a 100644
--- a/relational_operators/TableGeneratorOperator.hpp
+++ b/relational_operators/TableGeneratorOperator.hpp
@@ -115,6 +115,8 @@ class TableGeneratorWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param generator_function The GeneratorFunctionHandle to use.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
+   * @param generator_function The GeneratorFunctionHandle to use.
    * @param output_destination The InsertDestination to insert the generated
    *        output.
    **/

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 5acecbf..3bfd00a 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -609,6 +609,7 @@ void TextSplitWorkOrder::execute() {
 
   // Notify the operator about the completion of this Work Order.
   FeedbackMessage msg(TextScanOperator::kSplitWorkOrderCompletionMessage,
+                      getQueryID(),
                       operator_index_,
                       nullptr /* payload */,
                       0 /* payload_size */,
@@ -670,6 +671,7 @@ void TextSplitWorkOrder::sendBlobInfoToOperator(const bool write_row_aligned) {
 
   const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
   FeedbackMessage feedback_msg(TextScanOperator::kNewTextBlobMessage,
+                               getQueryID(),
                                operator_index_,
                                payload,
                                payload_size);
@@ -678,6 +680,7 @@ void TextSplitWorkOrder::sendBlobInfoToOperator(const bool write_row_aligned) {
   // Notify Foreman for the avaiable work order on the blob.
   serialization::WorkOrdersAvailableMessage message_proto;
   message_proto.set_operator_index(operator_index_);
+  message_proto.set_query_id(query_id_);
 
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const size_t message_proto_length = message_proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index 3cda65b..4fd5c04 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -381,6 +381,7 @@ class TextSplitWorkOrder : public WorkOrder {
   StorageManager *storage_manager_;
 
   const std::size_t operator_index_;  // Opeartor index.
+
   const tmb::client_id scheduler_client_id_;  // The scheduler's TMB client ID.
   MessageBus *bus_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 2130563..7a99000 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -91,6 +91,7 @@ void UpdateWorkOrder::execute() {
   proto.set_operator_index(update_operator_index_);
   proto.set_block_id(input_block_id_);
   proto.set_relation_id(relation_.getID());
+  proto.set_query_id(query_id_);
 
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const std::size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index cebb9b5..b4f9b9d 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -181,6 +181,7 @@ class UpdateWorkOrder : public WorkOrder {
   StorageManager *storage_manager_;
 
   const std::size_t update_operator_index_;
+
   const tmb::client_id scheduler_client_id_;
   MessageBus *bus_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 059865d..df195cc 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -65,20 +65,25 @@ class WorkOrder {
    *       relational operator.
    */
   struct FeedbackMessageHeader {
+    std::size_t query_id;
     std::size_t rel_op_index;
     std::size_t payload_size;
     FeedbackMessageType payload_type;
 
     /**
      * @brief Header constructor.
+     *
+     * @param query_id The ID of the query.
      * @param relational_op_index Index of the relation operator.
      * @param payload_size Size of the payload of the message.
      * @param payload_type Type of payload.
      */
-    FeedbackMessageHeader(const std::size_t relational_op_index,
+    FeedbackMessageHeader(const std::size_t query_id,
+                          const std::size_t relational_op_index,
                           const std::size_t payload_size,
                           const FeedbackMessageType payload_type)
-        : rel_op_index(relational_op_index),
+        : query_id(query_id),
+          rel_op_index(relational_op_index),
           payload_size(payload_size),
           payload_type(payload_type) {}
   };
@@ -93,17 +98,19 @@ class WorkOrder {
      * @brief Feedback message constructor.
      *
      * @param type Type of the message.
+     * @param query_id The ID of the query.
      * @param rel_op_index Relational operator index.
      * @param payload Blob of payload.
      * @param payload_size Size of the payload blob.
      * @param ownership Whether to take ownership of the payload blob.
      */
     FeedbackMessage(const FeedbackMessageType type,
+                    const std::size_t query_id,
                     const std::size_t rel_op_index,
                     void *payload,
                     const std::size_t payload_size,
                     const bool ownership = true)
-        : header_(rel_op_index, payload_size, type),
+        : header_(query_id, rel_op_index, payload_size, type),
           payload_(payload),
           ownership_(ownership) {}
 
@@ -285,6 +292,13 @@ class WorkOrder {
         " receiver thread with TMB client ID " << receiver_id;
   }
 
+  /**
+   * @brief Get the ID of the query which this WorkOder belongs to.
+   **/
+  inline const std::size_t getQueryID() const {
+    return query_id_;
+  }
+
  protected:
   /**
    * @brief Constructor.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index fdcc54f..27d974d 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -228,6 +228,8 @@ class AggregationOperatorTest : public ::testing::Test {
 
     // Setup the aggregation state proto in the query context proto.
     serialization::QueryContext query_context_proto;
+    query_context_proto.set_query_id(0);  // dummy query ID.
+
     const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
     serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
     aggr_state_proto->set_relation_id(table_->getID());
@@ -277,6 +279,7 @@ class AggregationOperatorTest : public ::testing::Test {
     const QueryContext::insert_destination_id insert_destination_index =
         query_context_proto.insert_destinations_size();
     serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+    insert_destination_proto->set_query_id(query_context_proto.query_id());
 
     insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
     insert_destination_proto->set_relation_id(result_table_->getID());
@@ -319,6 +322,8 @@ class AggregationOperatorTest : public ::testing::Test {
 
     // Setup the aggregation state proto in the query context proto.
     serialization::QueryContext query_context_proto;
+    query_context_proto.set_query_id(0);  // dummy query ID.
+
     const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
     serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
     aggr_state_proto->set_relation_id(table_->getID());
@@ -362,6 +367,8 @@ class AggregationOperatorTest : public ::testing::Test {
     const QueryContext::insert_destination_id insert_destination_index =
         query_context_proto.insert_destinations_size();
     serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+    insert_destination_proto->set_query_id(query_context_proto.query_id());
+    insert_destination_proto->set_query_id(query_context_proto.query_id());
 
     insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
     insert_destination_proto->set_relation_id(result_table_->getID());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 074b603..8fee50d 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -294,6 +294,7 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
 TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -355,6 +356,7 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
   const QueryContext::insert_destination_id output_destination_index =
       query_context_proto.insert_destinations_size();
   serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+  insert_destination_proto->set_query_id(query_context_proto.query_id());
 
   insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
   insert_destination_proto->set_relation_id(output_relation_id);
@@ -434,6 +436,7 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
 TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -503,6 +506,7 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
   const QueryContext::insert_destination_id output_destination_index =
       query_context_proto.insert_destinations_size();
   serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+  insert_destination_proto->set_query_id(query_context_proto.query_id());
 
   insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
   insert_destination_proto->set_relation_id(output_relation_id);
@@ -604,6 +608,7 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
 TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -660,6 +665,7 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
   const QueryContext::insert_destination_id output_destination_index =
       query_context_proto.insert_destinations_size();
   serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+  insert_destination_proto->set_query_id(query_context_proto.query_id());
 
   insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
   insert_destination_proto->set_relation_id(output_relation_id);
@@ -739,6 +745,7 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
 TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -801,6 +808,7 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
   const QueryContext::insert_destination_id output_destination_index =
       query_context_proto.insert_destinations_size();
   serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+  insert_destination_proto->set_query_id(query_context_proto.query_id());
 
   insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
   insert_destination_proto->set_relation_id(output_relation_id);
@@ -906,6 +914,7 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
 TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -973,6 +982,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
   const QueryContext::insert_destination_id output_destination_index =
       query_context_proto.insert_destinations_size();
   serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+  insert_destination_proto->set_query_id(query_context_proto.query_id());
 
   insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
   insert_destination_proto->set_relation_id(output_relation_id);
@@ -1083,6 +1093,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
 TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -1150,6 +1161,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
   const QueryContext::insert_destination_id output_destination_index =
       query_context_proto.insert_destinations_size();
   serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+  insert_destination_proto->set_query_id(query_context_proto.query_id());
 
   insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
   insert_destination_proto->set_relation_id(output_relation_id);