You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ha...@apache.org on 2016/06/15 19:48:43 UTC

[40/50] [abbrv] incubator-quickstep git commit: Long lived Foreman thread

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
new file mode 100644
index 0000000..2145429
--- /dev/null
+++ b/query_execution/PolicyEnforcer.cpp
@@ -0,0 +1,183 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcer.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManager.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "relational_operators/WorkOrder.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+              " can be allocated in a single round of dispatch of messages to"
+              " the workers.");
+
+bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+  if (admitted_queries_.size() < kMaxConcurrentQueries) {
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      // Query with the same ID not present, ok to admit.
+      admitted_queries_[query_id].reset(
+          new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+                           catalog_database_, storage_manager_, bus_));
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // This query will have to wait.
+    waiting_queries_.push(query_handle);
+    return false;
+  }
+}
+
+void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
+  // TODO(harshad) : Provide processXMessage() public functions in
+  // QueryManager, so that we need to extract message from the
+  // TaggedMessage only once.
+  std::size_t query_id;
+  switch (tagged_message.message_type()) {
+    case kWorkOrderCompleteMessage:  // Fall through.
+    case kRebuildWorkOrderCompleteMessage: {
+      serialization::WorkOrderCompletionMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kCatalogRelationNewBlockMessage: {
+      serialization::CatalogRelationNewBlockMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kDataPipelineMessage: {
+      serialization::DataPipelineMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kWorkOrdersAvailableMessage: {
+      serialization::WorkOrdersAvailableMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kWorkOrderFeedbackMessage: {
+      WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), tagged_message.message_bytes());
+      query_id = msg.header().query_id;
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
+  }
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  const QueryManager::QueryStatusCode return_code =
+      admitted_queries_[query_id]->processMessage(tagged_message);
+  if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
+    removeQuery(query_id);
+    if (!waiting_queries_.empty()) {
+      // Admit the earliest waiting query.
+      QueryHandle *new_query = waiting_queries_.front();
+      waiting_queries_.pop();
+      admitQuery(new_query);
+    }
+  }
+}
+
+void PolicyEnforcer::getWorkerMessages(
+    std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+  // Iterate over admitted queries until either there are no more
+  // messages available, or the maximum number of messages have
+  // been collected.
+  DCHECK(worker_messages->empty());
+  // TODO(harshad) - Make this function generic enough so that it
+  // works well when multiple queries are getting executed.
+  std::size_t per_query_share = 0;
+  if (!admitted_queries_.empty()) {
+    per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+  } else {
+    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    return;
+  }
+  DCHECK_GT(per_query_share, 0u);
+  std::vector<std::size_t> finished_queries_ids;
+
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManager *curr_query_manager = admitted_query_info.second.get();
+    DCHECK(curr_query_manager != nullptr);
+    std::size_t messages_collected_curr_query = 0;
+    while (messages_collected_curr_query < per_query_share) {
+      WorkerMessage *next_worker_message =
+          curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+      if (next_worker_message != nullptr) {
+        ++messages_collected_curr_query;
+        worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+      } else {
+        // No more work ordes from the current query at this time.
+        // Check if the query's execution is over.
+        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          // If the query has been executed, remove it.
+          finished_queries_ids.push_back(admitted_query_info.first);
+        }
+        break;
+      }
+    }
+  }
+  for (const std::size_t finished_qid : finished_queries_ids) {
+    removeQuery(finished_qid);
+  }
+}
+
+void PolicyEnforcer::removeQuery(const std::size_t query_id) {
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+    LOG(WARNING) << "Removing query with ID " << query_id
+                 << " that hasn't finished its execution";
+  }
+  admitted_queries_.erase(query_id);
+}
+
+bool PolicyEnforcer::admitQueries(
+    const std::vector<QueryHandle*> &query_handles) {
+  for (QueryHandle *curr_query : query_handles) {
+    if (!admitQuery(curr_query)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
new file mode 100644
index 0000000..5915b79
--- /dev/null
+++ b/query_execution/PolicyEnforcer.hpp
@@ -0,0 +1,167 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcer {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param num_numa_nodes Number of NUMA nodes used by the system.
+   * @param catalog_database The CatalogDatabase used.
+   * @param storage_manager The StorageManager used.
+   * @param bus The TMB.
+   **/
+  PolicyEnforcer(const tmb::client_id foreman_client_id,
+                 const std::size_t num_numa_nodes,
+                 CatalogDatabaseLite *catalog_database,
+                 StorageManager *storage_manager,
+                 tmb::MessageBus *bus)
+      : foreman_client_id_(foreman_client_id),
+        num_numa_nodes_(num_numa_nodes),
+        catalog_database_(catalog_database),
+        storage_manager_(storage_manager),
+        bus_(bus) {}
+
+  /**
+   * @brief Destructor.
+   **/
+  ~PolicyEnforcer() {
+    if (hasQueries()) {
+      LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
+                      "waiting queries";
+    }
+  }
+
+  /**
+   * @brief Admit a query to the system.
+   *
+   * @param query_handle The QueryHandle for the new query.
+   *
+   * @return Whether the query was admitted to the system.
+   **/
+  bool admitQuery(QueryHandle *query_handle);
+
+  /**
+   * @brief Admit multiple queries in the system.
+   *
+   * @note In the current simple implementation, we only allow one active
+   *       query in the system. Other queries will have to wait.
+   *
+   * @param query_handles A vector of QueryHandles for the queries to be
+   *        admitted.
+   *
+   * @return True if all the queries were admitted, false if at least one query
+   *         was not admitted.
+   **/
+  bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+  /**
+   * @brief Remove a given query that is under execution.
+   *
+   * @note This function is made public so that it is possible for a query
+   *       to be killed. Otherwise, it should only be used privately by the
+   *       class.
+   *
+   * TODO(harshad) - Extend this function to support removal of waiting queries.
+   *
+   * @param query_id The ID of the query to be removed.
+   **/
+  void removeQuery(const std::size_t query_id);
+
+  /**
+   * @brief Get worker messages to be dispatched. These worker messages come
+   *        from the active queries.
+   *
+   * @param worker_messages The worker messages to be dispatched.
+   **/
+  void getWorkerMessages(
+      std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+  /**
+   * @brief Process a message sent to the Foreman, which gets passed on to the
+   *        policy enforcer.
+   *
+   * @param message The message.
+   **/
+  void processMessage(const TaggedMessage &tagged_message);
+
+  /**
+   * @brief Check if there are any queries to be executed.
+   *
+   * @return True if there is at least one active or waiting query, false if
+   *         the policy enforcer doesn't have any query.
+   **/
+  inline bool hasQueries() const {
+    return !(admitted_queries_.empty() && waiting_queries_.empty());
+  }
+
+ private:
+  static constexpr std::size_t kMaxConcurrentQueries = 1;
+
+  const tmb::client_id foreman_client_id_;
+  const std::size_t num_numa_nodes_;
+
+  CatalogDatabaseLite *catalog_database_;
+  StorageManager *storage_manager_;
+
+  tmb::MessageBus *bus_;
+
+  // Key = query ID, value = QueryManager* for the key query.
+  std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
+
+  // The queries which haven't been admitted yet.
+  std::queue<QueryHandle*> waiting_queries_;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 3bfce17..54dd557 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -89,13 +89,13 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
 
   for (int i = 0; i < proto.insert_destinations_size(); ++i) {
     const serialization::InsertDestination &insert_destination_proto = proto.insert_destinations(i);
-    insert_destinations_.emplace_back(
-        InsertDestination::ReconstructFromProto(insert_destination_proto,
-                                                database.getRelationSchemaById(
-                                                    insert_destination_proto.relation_id()),
-                                                storage_manager,
-                                                scheduler_client_id,
-                                                bus));
+    insert_destinations_.emplace_back(InsertDestination::ReconstructFromProto(
+        proto.query_id(),
+        insert_destination_proto,
+        database.getRelationSchemaById(insert_destination_proto.relation_id()),
+        storage_manager,
+        scheduler_client_id,
+        bus));
   }
 
   for (int i = 0; i < proto.predicates_size(); ++i) {
@@ -231,7 +231,7 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
     }
   }
 
-  return true;
+  return proto.IsInitialized();
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index b37286c..98cd0b6 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -54,4 +54,6 @@ message QueryContext {
 
   // NOTE(zuyu): For UpdateWorkOrder only.
   repeated UpdateGroup update_groups = 10;
+
+  required uint64 query_id = 11;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 15803cf..9d9a9e5 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -27,6 +27,7 @@ message EmptyMessage {
 message WorkOrderCompletionMessage {
   required uint64 operator_index = 1;
   required uint64 worker_thread_index = 2;
+  required uint64 query_id = 3;
 }
 
 message CatalogRelationNewBlockMessage {
@@ -35,16 +36,19 @@ message CatalogRelationNewBlockMessage {
 
   // Used by PartitionAwareInsertDestination.
   optional uint64 partition_id = 3;
+  required uint64 query_id = 4;
 }
 
 message DataPipelineMessage {
   required uint64 operator_index = 1;
   required fixed64 block_id = 2;
   required int32 relation_id = 3;
+  required uint64 query_id = 4;
 }
 
 message WorkOrdersAvailableMessage {
   required uint64 operator_index = 1;
+  required uint64 query_id = 2;
 }
 
 // BlockLocator related messages.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fc253bc..9d1060f 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -58,6 +58,8 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
 
 // We sort the following message types in the order of a life cycle of a query.
 enum QueryExecutionMessageType : message_type_id {
+  kAdmitRequestMessage,  // Requesting a query (or queries) to be admitted, from
+                         // the main thread to Foreman.
   kWorkOrderMessage,  // From Foreman to Worker.
   kWorkOrderCompleteMessage,  // From Worker to Foreman.
   kCatalogRelationNewBlockMessage,  // From InsertDestination to Foreman.
@@ -67,7 +69,8 @@ enum QueryExecutionMessageType : message_type_id {
                               // their corresponding RelationalOperators.
   kRebuildWorkOrderMessage,  // From Foreman to Worker.
   kRebuildWorkOrderCompleteMessage,  // From Worker to Foreman.
-  kPoisonMessage,  // From the CLI shell to Foreman, then from Foreman to Workers.
+  kWorkloadCompletionMessage,  // From Foreman to main thread.
+  kPoisonMessage,  // From the main thread to Foreman and Workers.
 
 #ifdef QUICKSTEP_DISTRIBUTED
   // BlockLocator related messages, sorted in a life cycle of StorageManager

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index a8b6a38..78fd159 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -17,9 +17,12 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
 
+#include <memory>
 #include <utility>
 
+#include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkerMessage.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/address.h"
@@ -60,6 +63,55 @@ class QueryExecutionUtil {
                      std::move(tagged_message));
   }
 
+  /**
+   * @brief Construct and send an AdmitRequestMessage from a given sender to a
+   *        given recipient.
+   *
+   * @param sender_id The TMB client ID of the sender.
+   * @param receiver_id The TMB client ID of the receiver.
+   * @param query_handle The QueryHandle used in the AdmitRequestMessage.
+   * @param bus A pointer to the TMB.
+   * @param tagged_message A moved from reference to the tagged message.
+   *
+   * @return A status code indicating the result of the message delivery.
+   *         The caller should ensure that the status is SendStatus::kOK.
+   **/
+  static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
+      const tmb::client_id sender_id,
+      const tmb::client_id receiver_id,
+      QueryHandle *query_handle,
+      MessageBus *bus) {
+    std::unique_ptr<AdmitRequestMessage> request_message(
+        new AdmitRequestMessage(query_handle));
+    const std::size_t size_of_request_msg = sizeof(*request_message);
+    TaggedMessage admit_tagged_message(
+        request_message.release(), size_of_request_msg, kAdmitRequestMessage);
+
+    return QueryExecutionUtil::SendTMBMessage(
+        bus, sender_id, receiver_id, std::move(admit_tagged_message));
+  }
+
+  static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
+    // Terminate all threads.
+    // The sender thread broadcasts poison message to the workers and foreman.
+    // Each worker dies after receiving poison message. The order of workers'
+    // death is irrelavant.
+    MessageStyle style;
+    style.Broadcast(true);
+    Address address;
+    address.All(true);
+    std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
+    TaggedMessage poison_tagged_message(poison_message.get(),
+                                        sizeof(*poison_message),
+                                        kPoisonMessage);
+
+    const tmb::MessageBus::SendStatus send_status = bus->Send(
+        sender_id, address, style, std::move(poison_tagged_message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
+       "Broadcast poison message from sender with TMB client ID " << sender_id
+       << " failed";
+  }
+
  private:
   /**
    * @brief Constructor. Made private to avoid instantiation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryManager.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.hpp b/query_execution/QueryManager.hpp
index 47f54c5..b52460f 100644
--- a/query_execution/QueryManager.hpp
+++ b/query_execution/QueryManager.hpp
@@ -25,18 +25,21 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/DAG.hpp"
 #include "utility/Macros.hpp"
 
+#include "tmb/id_typedefs.h"
 #include "tmb/message_bus.h"
 #include "tmb/tagged_message.h"
 
 namespace quickstep {
 
 class CatalogDatabaseLite;
-class ForemanMessage;
 class QueryHandle;
 class StorageManager;
 class WorkerMessage;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
index eb9aedd..d023daa 100644
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@ -28,6 +28,8 @@
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
 
+#include "glog/logging.h"
+
 namespace quickstep {
 
 /** \addtogroup QueryExecution
@@ -76,7 +78,7 @@ class WorkOrdersContainer {
    * @return If there are pending WorkOrders.
    **/
   inline bool hasNormalWorkOrder(const std::size_t operator_index) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK_LT(operator_index, num_operators_);
     return normal_workorders_[operator_index].hasWorkOrder();
   }
 
@@ -92,9 +94,9 @@ class WorkOrdersContainer {
    **/
   inline bool hasNormalWorkOrderForNUMANode(
       const std::size_t operator_index, const int numa_node_id) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK_LT(operator_index, num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
     return normal_workorders_[operator_index].hasWorkOrderForNUMANode(
         numa_node_id);
   }
@@ -108,7 +110,7 @@ class WorkOrdersContainer {
    * @return If there are pending rebuild WorkOrders.
    **/
   inline bool hasRebuildWorkOrder(const std::size_t operator_index) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK_LT(operator_index, num_operators_);
     return rebuild_workorders_[operator_index].hasWorkOrder();
   }
 
@@ -124,9 +126,9 @@ class WorkOrdersContainer {
    **/
   inline bool hasRebuildWorkOrderForNUMANode(
       const std::size_t operator_index, const int numa_node_id) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK_LT(operator_index, num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
     return rebuild_workorders_[operator_index].hasWorkOrderForNUMANode(
         numa_node_id);
   }
@@ -144,9 +146,9 @@ class WorkOrdersContainer {
    **/
   WorkOrder* getNormalWorkOrderForNUMANode(const std::size_t operator_index,
                                            const int numa_node_id) {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK_LT(operator_index, num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
     return normal_workorders_[operator_index].getWorkOrderForNUMANode(
         numa_node_id);
   }
@@ -164,7 +166,7 @@ class WorkOrdersContainer {
    **/
   WorkOrder* getNormalWorkOrder(const std::size_t operator_index,
                                 const bool prefer_single_NUMA_node = true) {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK_LT(operator_index, num_operators_);
     return normal_workorders_[operator_index].getWorkOrder(
         prefer_single_NUMA_node);
   }
@@ -182,9 +184,9 @@ class WorkOrdersContainer {
    **/
   WorkOrder* getRebuildWorkOrderForNUMANode(const std::size_t operator_index,
                                             const int numa_node_id) {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK_LT(operator_index, num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
     return rebuild_workorders_[operator_index].getWorkOrderForNUMANode(
         numa_node_id);
   }
@@ -202,7 +204,7 @@ class WorkOrdersContainer {
    **/
   WorkOrder* getRebuildWorkOrder(const std::size_t operator_index,
                                  const bool prefer_single_NUMA_node = true) {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK_LT(operator_index, num_operators_);
     return rebuild_workorders_[operator_index].getWorkOrder(
         prefer_single_NUMA_node);
   }
@@ -220,8 +222,8 @@ class WorkOrdersContainer {
    * @param operator_index The index of the operator in the query DAG.
    **/
   void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
-    DEBUG_ASSERT(workorder != nullptr);
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(workorder != nullptr);
+    DCHECK_LT(operator_index, num_operators_);
     normal_workorders_[operator_index].addWorkOrder(workorder);
   }
 
@@ -238,8 +240,8 @@ class WorkOrdersContainer {
    **/
   void addRebuildWorkOrder(WorkOrder *workorder,
                            const std::size_t operator_index) {
-    DEBUG_ASSERT(workorder != nullptr);
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK(workorder != nullptr);
+    DCHECK_LT(operator_index, num_operators_);
     rebuild_workorders_[operator_index].addWorkOrder(workorder);
   }
 
@@ -254,9 +256,9 @@ class WorkOrdersContainer {
    **/
   inline std::size_t getNumNormalWorkOrdersForNUMANode(
       const std::size_t operator_index, const int numa_node_id) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK_LT(operator_index, num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
     return normal_workorders_[operator_index].getNumWorkOrdersForNUMANode(
         numa_node_id);
   }
@@ -271,7 +273,7 @@ class WorkOrdersContainer {
    **/
   inline std::size_t getNumNormalWorkOrders(
       const std::size_t operator_index) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK_LT(operator_index, num_operators_);
     return normal_workorders_[operator_index].getNumWorkOrders();
   }
 
@@ -286,9 +288,9 @@ class WorkOrdersContainer {
    **/
   inline std::size_t getNumRebuildWorkOrdersForNUMANode(
       const std::size_t operator_index, const int numa_node_id) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
-    DEBUG_ASSERT(numa_node_id >= 0);
-    DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+    DCHECK_LT(operator_index, num_operators_);
+    DCHECK_GE(numa_node_id, 0);
+    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
     return rebuild_workorders_[operator_index].getNumWorkOrdersForNUMANode(
         numa_node_id);
   }
@@ -303,7 +305,7 @@ class WorkOrdersContainer {
    **/
   inline std::size_t getNumRebuildWorkOrders(
       const std::size_t operator_index) const {
-    DEBUG_ASSERT(operator_index < num_operators_);
+    DCHECK_LT(operator_index, num_operators_);
     return rebuild_workorders_[operator_index].getNumWorkOrders();
   }
 
@@ -418,8 +420,8 @@ class WorkOrdersContainer {
     void addWorkOrder(WorkOrder *workorder);
 
     bool hasWorkOrderForNUMANode(const int numa_node_id) const {
-      DEBUG_ASSERT(numa_node_id >= 0);
-      DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+      DCHECK_GE(numa_node_id, 0);
+      DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
       return single_numa_node_workorders_[numa_node_id].hasWorkOrder() ||
              multiple_numa_nodes_workorders_.hasWorkOrderForNUMANode(
                  numa_node_id);
@@ -440,8 +442,8 @@ class WorkOrdersContainer {
 
     std::size_t getNumWorkOrdersForNUMANode(
         const int numa_node_id) const {
-      DEBUG_ASSERT(numa_node_id >= 0);
-      DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+      DCHECK_GE(numa_node_id, 0);
+      DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
       return single_numa_node_workorders_[numa_node_id].getNumWorkOrders() +
              multiple_numa_nodes_workorders_.getNumWorkOrdersForNUMANode(
                  numa_node_id);
@@ -463,8 +465,8 @@ class WorkOrdersContainer {
     }
 
     WorkOrder* getWorkOrderForNUMANode(const int numa_node_id) {
-      DEBUG_ASSERT(numa_node_id >= 0);
-      DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+      DCHECK_GE(numa_node_id, 0);
+      DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
       WorkOrder *work_order = single_numa_node_workorders_[numa_node_id].getWorkOrder();
       if (work_order == nullptr) {
         work_order = multiple_numa_nodes_workorders_.getWorkOrderForNUMANode(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 645fd05..ef596e1 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -58,11 +58,14 @@ void Worker::run() {
         WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
         DCHECK(message.getWorkOrder() != nullptr);
         message.getWorkOrder()->execute();
+        const std::size_t query_id_for_workorder =
+            message.getWorkOrder()->getQueryID();
         delete message.getWorkOrder();
 
-        sendWorkOrderCompleteMessage(annotated_msg.sender,
-                                     message.getRelationalOpIndex(),
-                                     tagged_message.message_type() == kRebuildWorkOrderMessage);
+        sendWorkOrderCompleteMessage(
+            annotated_msg.sender, message.getRelationalOpIndex(),
+            query_id_for_workorder,
+            tagged_message.message_type() == kRebuildWorkOrderMessage);
         break;
       }
       case kPoisonMessage: {
@@ -76,10 +79,12 @@ void Worker::run() {
 
 void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
                                           const size_t op_index,
+                                          const size_t query_id,
                                           const bool is_rebuild_work_order) {
   serialization::WorkOrderCompletionMessage proto;
   proto.set_operator_index(op_index);
   proto.set_worker_thread_index(worker_thread_index_);
+  proto.set_query_id(query_id);
 
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index b94e937..c0bafdc 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -97,11 +97,13 @@ class Worker : public Thread {
    *
    * @param receiver The id of the TMB client which should receive the response.
    * @param op_index The index of the operator to which the WorkOrder belongs.
+   * @param query_id The ID of the query which the WorkOrder belongs to.
    * @param is_rebuild_work_order True if it is a RebuildWorkOrder. Otherwise
    *        false.
    **/
   void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
                                     const std::size_t op_index,
+                                    const std::size_t query_id,
                                     const bool is_rebuild_work_order);
 
   const std::size_t worker_thread_index_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/WorkerMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp
index ec63af9..560c1ba 100644
--- a/query_execution/WorkerMessage.hpp
+++ b/query_execution/WorkerMessage.hpp
@@ -30,6 +30,8 @@ class WorkOrder;
  **/
 class WorkerMessage {
  public:
+  static constexpr int kInvalidRecipientIndexHint = -1;
+
   enum class WorkerMessageType {
     kRebuildWorkOrder = 0,
     kWorkOrder,
@@ -105,6 +107,23 @@ class WorkerMessage {
     return type_;
   }
 
+  /**
+   * @brief Set a hint for the recipient worker thread.
+   *
+   * @param recipient_index_hint The hint i.e. the worker thread index.
+   **/
+  inline void setRecipientHint(const int recipient_index_hint) {
+    recipient_index_hint_ = recipient_index_hint;
+  }
+
+  /**
+   * @brief Get the hint for the recipient worker thread. The hint is invalid if
+   *        it is kInvalidRecipientIndexHint.
+   **/
+  inline int getRecipientHint() const {
+    return recipient_index_hint_;
+  }
+
  private:
   /**
    * @brief Constructor.
@@ -120,12 +139,13 @@ class WorkerMessage {
                 const WorkerMessageType type)
       : work_unit_(work_unit),
         relational_op_index_(relational_op_index),
-        type_(type) {
-  }
+        type_(type),
+        recipient_index_hint_(kInvalidRecipientIndexHint) {}
 
   WorkOrder *work_unit_;
   const std::size_t relational_op_index_;
   const WorkerMessageType type_;
+  int recipient_index_hint_;
 };
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/tests/Foreman_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Foreman_unittest.cpp b/query_execution/tests/Foreman_unittest.cpp
deleted file mode 100644
index cbe5088..0000000
--- a/query_execution/tests/Foreman_unittest.cpp
+++ /dev/null
@@ -1,952 +0,0 @@
-/**
- *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015-2016 Pivotal Software, Inc.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#include <climits>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/Foreman.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryPlan.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/InsertDestination.pb.h"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "utility/DAG.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "gtest/gtest.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::client_id;
-
-namespace quickstep {
-
-class WorkOrderProtosContainer;
-
-class MockWorkOrder : public WorkOrder {
- public:
-  explicit MockWorkOrder(const int op_index)
-      : WorkOrder(0), op_index_(op_index) {}
-
-  void execute() override {
-    VLOG(3) << "WorkOrder[" << op_index_ << "] executing.";
-  }
-
-  inline QueryPlan::DAGNodeIndex getOpIndex() const {
-    return op_index_;
-  }
-
- private:
-  const QueryPlan::DAGNodeIndex op_index_;
-
-  DISALLOW_COPY_AND_ASSIGN(MockWorkOrder);
-};
-
-class MockOperator: public RelationalOperator {
- public:
-  enum function_name {
-    kFeedInputBlock = 0,
-    kFeedInputBlocks,
-    kDoneFeedingInputBlocks,
-    kGetAllWorkOrders
-  };
-
-  MockOperator(const bool produce_workorders,
-               const bool has_streaming_input,
-               const int max_getworkorder_iters = 1,
-               const int max_workorders = INT_MAX)
-      : RelationalOperator(0 /* Query Id */),
-        produce_workorders_(produce_workorders),
-        has_streaming_input_(has_streaming_input),
-        max_workorders_(max_workorders),
-        max_getworkorder_iters_(max_getworkorder_iters),
-        num_calls_get_workorders_(0),
-        num_workorders_generated_(0),
-        num_calls_feedblock_(0),
-        num_calls_feedblocks_(0),
-        num_calls_donefeedingblocks_(0) {
-  }
-
-#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": "
-
-  // The methods below are used to check whether Foreman calls the Relational
-  // operator, how many times it calls a particular method etc.
-  inline int getNumWorkOrders() const {
-    return num_workorders_generated_;
-  }
-
-  inline int getNumCalls(const function_name fname) const {
-    switch (fname) {
-      case kFeedInputBlock:
-        return num_calls_feedblock_;
-      case kFeedInputBlocks:
-        return num_calls_feedblocks_;
-      case kDoneFeedingInputBlocks:
-        return num_calls_donefeedingblocks_;
-      case kGetAllWorkOrders:
-        return num_calls_get_workorders_;
-      default:
-        return -1;
-    }
-  }
-
-  inline bool getBlockingDependenciesMet() const {
-    MOCK_OP_LOG(3) << "met.";
-    return blocking_dependencies_met_;
-  }
-
-  void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
-    insert_destination_index_ = insert_destination_index;
-  }
-
-  // Mock to trigger doneFeedingInputBlocks for the dependent operators
-  // in Foreman::markOperatorFinished.
-  void setOutputRelationID(const relation_id rel_id) {
-    output_relation_id_ = rel_id;
-  }
-
-  // Override methods from the base class.
-  bool getAllWorkOrders(
-      WorkOrdersContainer *container,
-      QueryContext *query_context,
-      StorageManager *storage_manager,
-      const tmb::client_id foreman_client_id,
-      tmb::MessageBus *bus) override {
-    ++num_calls_get_workorders_;
-    if (produce_workorders_) {
-      if (has_streaming_input_) {
-        if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) {
-          MOCK_OP_LOG(3) << "[stream] generate WorkOrder";
-          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
-          ++num_workorders_generated_;
-        }
-      } else {
-        if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
-          MOCK_OP_LOG(3) << "[static] generate WorkOrder";
-          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
-          ++num_workorders_generated_;
-        }
-      }
-    }
-    MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") "
-                   << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")";
-    return num_calls_get_workorders_ == max_getworkorder_iters_;
-  }
-
-  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override {
-    return true;
-  }
-
-  void feedInputBlock(const block_id input_block_id,
-                      const relation_id input_relation_id) override {
-    ++num_calls_feedblock_;
-    MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
-  }
-
-  void feedInputBlocks(const relation_id rel_id,
-                       std::vector<block_id> *partially_filled_blocks) override {
-    ++num_calls_feedblocks_;
-    MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")";
-  }
-
-  void doneFeedingInputBlocks(const relation_id rel_id) override {
-    ++num_calls_donefeedingblocks_;
-    MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")";
-  }
-
-  QueryContext::insert_destination_id getInsertDestinationID() const override {
-    return insert_destination_index_;
-  }
-
-  const relation_id getOutputRelationID() const override {
-    return output_relation_id_;
-  }
-
- private:
-  const bool produce_workorders_;
-  const bool has_streaming_input_;
-  const int max_workorders_;
-  const int max_getworkorder_iters_;
-
-  int num_calls_get_workorders_;
-  int num_workorders_generated_;
-  int num_calls_feedblock_;
-  int num_calls_feedblocks_;
-  int num_calls_donefeedingblocks_;
-
-  QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId;
-
-  relation_id output_relation_id_ = -1;
-
-#undef MOCK_OP_LOG
-
-  DISALLOW_COPY_AND_ASSIGN(MockOperator);
-};
-
-
-class ForemanTest : public ::testing::Test {
- protected:
-  // Class ForemanTest is the friend of class Foreman. Each TEST_F behaves
-  // as a separate class, so we can't access Foreman's private members in
-  // TEST_F.
-  virtual void SetUp() {
-    db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
-    storage_manager_.reset(new StorageManager("./"));
-
-    query_plan_.reset(new QueryPlan());
-
-    bus_.Initialize();
-
-    foreman_.reset(new Foreman(&bus_, db_.get(), storage_manager_.get()));
-
-    // This thread acts both as Foreman as well as Worker. Foreman connects to
-    // the bus in its constructor.
-    worker_client_id_ = bus_.Connect();
-
-    // Register as sender and receiver for relevant types of messages.
-    bus_.RegisterClientAsSender(worker_client_id_, kWorkOrderCompleteMessage);
-    bus_.RegisterClientAsSender(worker_client_id_, kRebuildWorkOrderCompleteMessage);
-    bus_.RegisterClientAsSender(worker_client_id_, kDataPipelineMessage);
-    bus_.RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);
-    bus_.RegisterClientAsReceiver(worker_client_id_, kRebuildWorkOrderMessage);
-    bus_.RegisterClientAsReceiver(worker_client_id_, kPoisonMessage);
-
-    std::vector<client_id> worker_client_ids;
-    worker_client_ids.push_back(worker_client_id_);
-
-    std::vector<numa_node_id> numa_nodes;
-    numa_nodes.push_back(static_cast<numa_node_id>(-1));
-
-    workers_.reset(new WorkerDirectory(1, worker_client_ids, numa_nodes));
-    foreman_->setWorkerDirectory(workers_.get());
-  }
-
-  inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
-    return foreman_->query_exec_state_->getNumQueuedWorkOrders(index);
-  }
-
-  inline const int getNumOperatorsFinished() const {
-    return foreman_->query_exec_state_->getNumOperatorsFinished();
-  }
-
-  inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const {
-    return foreman_->query_exec_state_->hasExecutionFinished(index);
-  }
-
-  inline bool popWorkOrderIfAvailable(MockWorkOrder **workorder) {
-    AnnotatedMessage msg;
-    if (bus_.ReceiveIfAvailable(worker_client_id_, &msg)) {
-      WorkerMessage message(*static_cast<const WorkerMessage*>(msg.tagged_message.message()));
-      *workorder = static_cast<MockWorkOrder*>(message.getWorkOrder());
-      return true;
-    }
-    return false;
-  }
-
-  inline bool popRebuildWorkOrderIfAvailable(MockWorkOrder **workorder) {
-    return popWorkOrderIfAvailable(workorder);
-  }
-
-  inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
-    VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
-    foreman_->processDataPipelineMessage(source_operator_index, 0 /* block_id */, 0 /* relation_id */);
-    return foreman_->query_exec_state_->hasQueryExecutionFinished();
-  }
-
-  inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
-    VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
-    foreman_->processWorkOrderCompleteMessage(index, 0 /* worker id */);
-    return foreman_->query_exec_state_->hasQueryExecutionFinished();
-  }
-
-  inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
-    VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
-    foreman_->processRebuildWorkOrderCompleteMessage(index, 0 /* worker id */);
-    return foreman_->query_exec_state_->hasQueryExecutionFinished();
-  }
-
-  inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
-    VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
-    foreman_->processDataPipelineMessage(index,
-                                         BlockIdUtil::GetBlockId(1 /* domain */, 1),
-                                         0 /* relation_id */);
-    return foreman_->query_exec_state_->hasQueryExecutionFinished();
-  }
-
-  inline bool startForeman() {
-    foreman_->initialize();
-    return foreman_->query_exec_state_->hasQueryExecutionFinished();
-  }
-
-  inline int getWorkerInputQueueSize() {
-    return bus_.CountQueuedMessagesForClient(worker_client_id_);
-  }
-
-  unique_ptr<CatalogDatabase> db_;
-  unique_ptr<StorageManager> storage_manager_;
-
-  unique_ptr<QueryPlan> query_plan_;
-
-  unique_ptr<Foreman> foreman_;
-  MessageBusImpl bus_;
-
-  client_id worker_client_id_;
-
-  unique_ptr<WorkerDirectory> workers_;
-};
-
-TEST_F(ForemanTest, SingleNodeDAGNoWorkOrdersTest) {
-  // This test creates a DAG of a single node. No workorders are generated.
-  query_plan_->addRelationalOperator(new MockOperator(false, false));
-  foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
-  const MockOperator &op = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(0));
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  // Foreman exits after initialize, since no workorders are generated.
-  EXPECT_TRUE(startForeman());
-
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
-  // No workorder is generated. No response is received.
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  // We expect one call for op's getAllWorkOrders().
-  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(ForemanTest, SingleNodeDAGStaticWorkOrdersTest) {
-  // This test creates a DAG of a single node. Static workorders are generated.
-  const QueryPlan::DAGNodeIndex id = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
-  const MockOperator &op = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id));
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-  EXPECT_FALSE(startForeman());
-
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
-  // We expect one call for op's getAllWorkOrders().
-  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // One workorder is generated.
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-  EXPECT_EQ(1, op.getNumWorkOrders());
-
-  // Worker receives a WorkOrder.
-  MockWorkOrder *work_order;
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  EXPECT_EQ(id, work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
-  EXPECT_EQ(0, getNumOperatorsFinished());
-
-  // Send a message to Foreman upon workorder completion.
-  // Last event processed by Foreman.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
-  EXPECT_EQ(1, getNumOperatorsFinished());
-  EXPECT_TRUE(getOperatorFinishedStatus(id));
-}
-
-TEST_F(ForemanTest, SingleNodeDAGDynamicWorkOrdersTest) {
-  // This test creates a DAG of a single node. WorkOrders are generated
-  // dynamically as pending work orders complete execution, i.e.,
-  // getAllWorkOrders() is called multiple times.  getAllWorkOrders() will be
-  // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
-  // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will insert no
-  // WorkOrder and return true.
-
-  // TODO(shoban): This test can not be more robust than this because of fixed
-  // scaffolding of mocking. If we use gMock, we can do much better.
-  const QueryPlan::DAGNodeIndex id = query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
-  foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
-  const MockOperator &op = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id));
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-  EXPECT_FALSE(startForeman());
-
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
-  for (int i = 0; i < 3; i++) {
-    // We expect one call for op's getAllWorkOrders().
-    EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-    // One workorder is generated.
-    EXPECT_EQ(1, getWorkerInputQueueSize());
-    EXPECT_EQ(i + 1, op.getNumWorkOrders());
-
-    // Worker receives a WorkOrder.
-    MockWorkOrder *work_order;
-    ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-    EXPECT_EQ(id, work_order->getOpIndex());
-
-    work_order->execute();
-    delete work_order;
-
-    EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
-    EXPECT_EQ(0, getNumOperatorsFinished());
-
-    if (i < 2) {
-      // Send a message to Foreman upon workorder completion.
-      EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
-    } else {
-      // Send a message to Foreman upon workorder completion.
-      // Last event.
-      EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
-    }
-  }
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
-
-  EXPECT_EQ(1, getNumOperatorsFinished());
-  EXPECT_TRUE(getOperatorFinishedStatus(id));
-
-  // We place this check in the end, since it's true throughout the test.
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(ForemanTest, TwoNodesDAGBlockingLinkTest) {
-  // We use two nodes in the DAG with a blocking link between them.
-  // There is no streaming of data involved in this test.
-  const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false));
-  const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, false));
-
-  // Create a blocking link.
-  query_plan_->addDirectDependency(id2, id1, true);
-
-  static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
-      ->setOutputRelationID(0xdead);
-
-  const MockOperator &op1 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id1));
-  const MockOperator &op2 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
-  foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
-  // Make sure queues are empty initially.
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  EXPECT_FALSE(startForeman());
-
-  // op1 doesn't have any dependencies
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-
-  // Only op1 should receive a call to getAllWorkOrders initially.
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // Only op1 should produce a workorder.
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-  EXPECT_EQ(1, op1.getNumWorkOrders());
-  EXPECT_EQ(0, op2.getNumWorkOrders());
-
-  // Worker receives a WorkOrder.
-  MockWorkOrder *work_order;
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // This workorder's source should be op1.
-  EXPECT_EQ(id1, work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-  // Foreman hasn't yet got workorder completion response for the workorder.
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id1));
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-  EXPECT_EQ(0, getNumOperatorsFinished());
-
-  // Send a message to Foreman upon workorder (generated by op1) completion.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-  // op1 is over now, op2 still to go.
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-  EXPECT_EQ(1, getNumOperatorsFinished());
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
-  // op1 is op2's blocking dependency.
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  // op2 should get first call of getAllWorkOrders() when op1 is over.
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-  EXPECT_EQ(1, op2.getNumWorkOrders());
-
-  // Send a message to Foreman upon workorder (generated by op2) completion.
-
-  // Note that the worker hasn't yet popped the workorder. Usually this won't
-  // happen as workers pop workorders first, execute and then send the response.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
-  // WorkOrder yet to be popped by the worker.
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
-  EXPECT_EQ(2, getNumOperatorsFinished());
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // The workorder should have come from op2.
-  EXPECT_EQ(id2, work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  // Expect no additional calls to getAllWorkOrders.
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-}
-
-TEST_F(ForemanTest, TwoNodesDAGPipeLinkTest) {
-  // We use two nodes in the DAG with a non-blocking link between them.
-  // We stream output of op1 to op2. Sequeuce of events is as follows:
-  // 1. op1 creates a workorder.
-  // 2. We send a "block full" (from op1) to Foreman.
-  // 3. op2 creates a workorder because of step 2.
-  const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
-
-  // Create a non-blocking link.
-  query_plan_->addDirectDependency(id2, id1, false);
-
-  static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
-      ->setOutputRelationID(0xdead);
-
-  const MockOperator &op1 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id1));
-  const MockOperator &op2 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
-  foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
-  // Make sure queues are empty initially.
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  startForeman();
-
-  // As none of the operators have a blocking link, blocking dependencies should
-  // be met.
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op1.getNumWorkOrders());
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  // op2 will generate workorder only after receiving a streaming input.
-  EXPECT_EQ(0, op2.getNumWorkOrders());
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // There should be one workorder sent to the worker so far.
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-
-  // Worker receives a WorkOrder.
-  MockWorkOrder *work_order;
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // This workorder's source be op1.
-  EXPECT_EQ(id1, work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  // Send a message to Foreman upon block getting full (output of op1).
-  EXPECT_FALSE(placeOutputBlockMessage(id1));
-
-  // op1 is not finished yet because the response of workorder completion hasn't
-  // been received yet by the Foreman.
-  EXPECT_FALSE(getOperatorFinishedStatus(id1));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  // No additional call to op1's getAllWorkOrders.
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // Output from op1 should be fed to op2.
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // A call to op2's getAllWorkOrders because of the streamed input.
-  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op2.getNumWorkOrders());
-
-  // Place a message of a workorder completion of op1 on Foreman's input queue.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
-  // An additional call to op2's getAllWorkOrders because of completion of op1.
-  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(2, op2.getNumWorkOrders());
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-
-  // Pop a workorder from Foreman's output queue.
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // The workorder should have been generated by op2.
-  EXPECT_EQ(id2, work_order->getOpIndex());
-  work_order->execute();
-  delete work_order;
-
-  // Place a message of a workorder completion of op2 on Foreman's input queue.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-
-  // Pop a workorder from Foreman's output queue.
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // The workorder should have been generated by op2.
-  EXPECT_EQ(id2, work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  // Send a message to Foreman upon workorder (generated by op2) completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-}
-
-TEST_F(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest) {
-  // In this test, we create a 2-node DAG with a non-blocking link between them.
-  // There is no streaming of data from op1 to op2 during the execution of op1.
-  // op1 produces a partially filled block at the end of its execution which is
-  // rebuilt and then fed to op2.
-  const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
-
-  // Create a non-blocking link.
-  query_plan_->addDirectDependency(id2, id1, false);
-
-  // Create a relation, owned by db_.
-  CatalogRelation *relation = new CatalogRelation(nullptr /* catalog_database */, "test_relation");
-  const relation_id output_relation_id = db_->addRelation(relation);
-
-  // Setup the InsertDestination proto in the query context proto.
-  serialization::QueryContext query_context_proto;
-
-  const QueryContext::insert_destination_id insert_destination_index =
-      query_context_proto.insert_destinations_size();
-  serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
-
-  insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
-  insert_destination_proto->set_relation_id(output_relation_id);
-  insert_destination_proto->set_relational_op_index(id1);
-
-  MockOperator *op1_mutable =
-      static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1));
-  op1_mutable->setInsertDestinationID(insert_destination_index);
-  op1_mutable->setOutputRelationID(output_relation_id);
-
-  const MockOperator &op1 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id1));
-  const MockOperator &op2 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
-  foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-  foreman_->reconstructQueryContextFromProto(query_context_proto);
-
-  // NOTE(zuyu): An operator generally has no ideas about partially filled
-  // blocks, but InsertDestination in QueryContext does.
-  // Mock to add partially filled blocks in the InsertDestination.
-  InsertDestination *insert_destination =
-      foreman_->query_context_->getInsertDestination(insert_destination_index);
-  DCHECK(insert_destination != nullptr);
-  MutableBlockReference block_ref;
-  static_cast<BlockPoolInsertDestination*>(insert_destination)->available_block_refs_.push_back(move(block_ref));
-
-  // Make sure queues are empty initially.
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  startForeman();
-
-  // There's no blocking dependency in the DAG.
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op1.getNumWorkOrders());
-
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op2.getNumWorkOrders());
-
-  // Worker receives a WorkOrder.
-  MockWorkOrder *work_order;
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // The workorder should have been generated by op1.
-  EXPECT_EQ(id1, work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  // Send a message to Foreman upon workorder (generated by op1) completion.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-
-  // op1 generates a rebuild workorder. The block is rebuilt and streamed
-  // to Foreman.
-  EXPECT_FALSE(placeDataPipelineMessage(id1));
-
-  // Based on the streamed input, op2's getAllWorkOrders should produce a
-  // workorder.
-  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op2.getNumWorkOrders());
-
-  // Worker receives a rebuild WorkOrder.
-  MockWorkOrder *rebuild_op1;
-  ASSERT_TRUE(popRebuildWorkOrderIfAvailable(&rebuild_op1));
-  // We skip the check for relation ID of the rebuild WorkOrder, as the partially
-  // filled block reference is a mock reference with garbage contents.
-  delete rebuild_op1;
-
-  EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-
-  // Worker receives a WorkOrder.
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // The workorder should have been generated by op2.
-  EXPECT_EQ(id2, work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  // Send a message to Foreman upon workorder (generated by op2) completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-}
-
-TEST_F(ForemanTest, MultipleNodesNoOutputTest) {
-  // When an operator produces workorders but no output, the Foreman should
-  // check the dependents of this operator to make progress.
-  const QueryPlan::DAGNodeIndex kNumNodes = 5;
-  std::vector<QueryPlan::DAGNodeIndex> ids;
-  ids.reserve(kNumNodes);
-
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    if (i == 0) {
-      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
-    } else {
-      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
-    }
-    VLOG(3) << ids[i];
-  }
-
-  /**
-   * The DAG looks like this:
-   *
-   * op1 -> op2 -> op3 -> op4 -> op5
-   *
-   **/
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
-    query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
-    static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
-        ->setOutputRelationID(0xdead);
-  }
-
-  std::vector<const MockOperator*> operators;
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
-  }
-
-  foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
-  // Make sure queues are empty initially.
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  startForeman();
-
-  // operators[0] should have produced a workorder by now.
-  EXPECT_EQ(1, operators[0]->getNumWorkOrders());
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
-  EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
-
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
-  }
-
-  // Worker receives a WorkOrder.
-  MockWorkOrder *work_order;
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // The workorder should have been generated by operators[0].
-  EXPECT_EQ(ids[0], work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  // Send a message to Foreman upon workorder (generated by operators[0])
-  // completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
-
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
-    EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
-    if (i < kNumNodes - 1) {
-      EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-    }
-  }
-}
-
-TEST_F(ForemanTest, OutOfOrderWorkOrderCompletionTest) {
-  // Consider two operators, both generate one workorder each. The dependent's
-  // workorder finishes before dependency's workorder.
-  const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
-
-  // Create a non-blocking link.
-  query_plan_->addDirectDependency(id2, id1, false);
-
-  foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-  // There should be two workorders on Worker's private queue, for this test.
-  foreman_->setMaxMessagesPerWorker(2);
-
-  // Make sure queues are empty initially.
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  startForeman();
-
-  // Expect one workorder produced by op1.
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-
-  // Pop a workorder from Foreman's output queue.
-  MockWorkOrder *work_order;
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // This workorder's source be op1.
-  EXPECT_EQ(id1, work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  // Send a message to Foreman upon a block (output of op1) getting full.
-  EXPECT_FALSE(placeOutputBlockMessage(id1));
-
-  // op1 is not finished yet because the response of workorder completion hasn't
-  // been received yet.
-  EXPECT_FALSE(getOperatorFinishedStatus(id1));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  // Expect one workorder produced by op2.
-  EXPECT_EQ(1, getWorkerInputQueueSize());
-
-  // Worker receives a WorkOrder.
-  ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
-  // This workorder's source should be op2.
-  EXPECT_EQ(id2, work_order->getOpIndex());
-
-  work_order->execute();
-  delete work_order;
-
-  // As mentioned earlier, op2 finishes before op1.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-
-  // op1's workorder execution is over.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
-  EXPECT_EQ(0, getWorkerInputQueueSize());
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
index 9ba5978..4f98748 100644
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ b/query_execution/tests/QueryManager_unittest.cpp
@@ -228,8 +228,9 @@ class QueryManagerTest : public ::testing::Test {
     db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
     storage_manager_.reset(new StorageManager("./"));
     bus_.Initialize();
-    query_handle_.reset(new QueryHandle(0));
+    query_handle_.reset(new QueryHandle(0));  // dummy query ID.
     query_plan_ = query_handle_->getQueryPlanMutable();
+    query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
   }
 
   inline void constructQueryManager() {
@@ -256,6 +257,7 @@ class QueryManagerTest : public ::testing::Test {
 
     proto.set_block_id(0);  // dummy block ID
     proto.set_relation_id(0);  // dummy relation ID.
+    proto.set_query_id(0);  // dummy query ID.
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();
@@ -276,6 +278,7 @@ class QueryManagerTest : public ::testing::Test {
     serialization::WorkOrderCompletionMessage proto;
     proto.set_operator_index(index);
     proto.set_worker_thread_index(1);  // dummy worker ID.
+    proto.set_query_id(0);  // dummy query ID.
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const size_t proto_length = proto.ByteSize();
@@ -297,6 +300,7 @@ class QueryManagerTest : public ::testing::Test {
     serialization::WorkOrderCompletionMessage proto;
     proto.set_operator_index(index);
     proto.set_worker_thread_index(1);  // dummy worker thread ID.
+    proto.set_query_id(0);  // dummy query ID.
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const size_t proto_length = proto.ByteSize();
@@ -320,6 +324,7 @@ class QueryManagerTest : public ::testing::Test {
 
     proto.set_block_id(0);  // dummy block ID
     proto.set_relation_id(0);  // dummy relation ID.
+    proto.set_query_id(0);  // dummy query ID.
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/tests/WorkOrdersContainer_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/WorkOrdersContainer_unittest.cpp b/query_execution/tests/WorkOrdersContainer_unittest.cpp
index cf133c4..cb583ab 100644
--- a/query_execution/tests/WorkOrdersContainer_unittest.cpp
+++ b/query_execution/tests/WorkOrdersContainer_unittest.cpp
@@ -72,6 +72,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) {
   // they get inserted and retrieved correctly.
   std::vector<int> numa_node_ids;
   // A container for one operator and no NUMA nodes.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, 0);
 
   EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
@@ -104,11 +105,15 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) {
   ASSERT_TRUE(returned_work_order != nullptr);
   EXPECT_EQ(work_order.getID(), static_cast<MockNUMAWorkOrder*>(returned_work_order)->getID());
 
+  EXPECT_EQ(query_id, returned_work_order->getQueryID());
+
   WorkOrder *returned_rebuild_work_order = w.getRebuildWorkOrder(0);
   ASSERT_TRUE(returned_rebuild_work_order != nullptr);
   EXPECT_EQ(work_order1.getID(),
             static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
 
+  EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
+
   // Container should be empty now.
   EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
   EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0));
@@ -123,6 +128,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) {
   // if they get inserted and retrieved correctly and the order of retrieval.
   // A container for one operator and no NUMA nodes.
   std::vector<int> numa_node_ids;
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, 0);
 
   EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
@@ -164,6 +170,8 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) {
     ASSERT_TRUE(returned_work_order != nullptr);
     EXPECT_EQ(static_cast<int>(kNumWorkOrders + i),
               static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+    EXPECT_EQ(query_id, returned_work_order->getQueryID());
+    EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
   }
 
   // Container should be empty now.
@@ -190,6 +198,7 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) {
   const std::size_t kNUMANodesUsed = numa_node_ids.size();
 
   // A container for one operator and kNUMANodes.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, kNUMANodes);
 
   for (std::size_t i = 0; i < kNUMANodesUsed; ++i) {
@@ -246,6 +255,9 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) {
     ASSERT_TRUE(returned_rebuild_work_order != nullptr);
     EXPECT_EQ(rebuild_workorders[i].getID(),
               static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+
+    EXPECT_EQ(query_id, returned_work_order->getQueryID());
+    EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
   }
 
   // No workorder should be left for this operator on any NUMA node.
@@ -291,6 +303,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
   const std::size_t kNUMANodesUsed = numa_nodes.size();
 
   // Create the container.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, kNUMANodes);
 
   w.addNormalWorkOrder(&multiple_numa_work_order, 0);
@@ -331,6 +344,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
       w.getNormalWorkOrderForNUMANode(0, numa_nodes[0]));
   ASSERT_TRUE(observed_work_order != nullptr);
 
+  EXPECT_EQ(query_id, observed_work_order->getQueryID());
   EXPECT_EQ(one_numa_work_order.getPreferredNUMANodes().front(),
             observed_work_order->getPreferredNUMANodes().front());
   EXPECT_EQ(one_numa_work_order.getID(), observed_work_order->getID());
@@ -348,6 +362,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
   EXPECT_EQ(no_numa_work_order.getID(),
             static_cast<MockNUMAWorkOrder *>(observed_non_numa_work_order)->getID());
 
+  EXPECT_EQ(query_id, observed_non_numa_work_order->getQueryID());
   EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[0]));
   EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[1]));
 
@@ -361,6 +376,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
   ASSERT_TRUE(observed_work_order_multiple_numa_nodes != nullptr);
   EXPECT_EQ(multiple_numa_work_order.getID(), observed_work_order_multiple_numa_nodes->getID());
 
+  EXPECT_EQ(query_id, observed_work_order_multiple_numa_nodes->getQueryID());
   std::vector<int> observed_numa_nodes(
       observed_work_order_multiple_numa_nodes->getPreferredNUMANodes());
   // Look up the expected numa nodes in the observed_numa_nodes vector.
@@ -427,6 +443,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
   const std::size_t kNUMANodes = numa_node_ids.size();
 
   // Create the container.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(kNumOperators, kNUMANodes);
 
   std::vector<std::size_t> operator_ids;
@@ -538,6 +555,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
             curr_operator_id, single_numa_node_id));
     ASSERT_TRUE(observed_work_order_single_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_single_numa =
         normal_workorders_one_numa_ids[curr_operator_id];
@@ -550,6 +568,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
             curr_operator_id, multiple_numa_node_id));
     ASSERT_TRUE(observed_work_order_multiple_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_multiple_numa =
         normal_workorders_multiple_numa_ids[curr_operator_id];
@@ -562,6 +581,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
         static_cast<MockNUMAWorkOrder *>(w.getNormalWorkOrder(curr_operator_id));
     ASSERT_TRUE(observed_work_order_no_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_no_numa =
         normal_workorders_no_numa_ids[curr_operator_id];
@@ -620,6 +640,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
   const std::size_t kNUMANodes = numa_node_ids.size();
 
   // Create the container.
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(kNumOperators, kNUMANodes);
 
   std::vector<std::size_t> operator_ids;
@@ -732,6 +753,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
             curr_operator_id, single_numa_node_id));
     ASSERT_TRUE(observed_work_order_single_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_single_numa =
         rebuild_workorders_one_numa_ids[curr_operator_id];
@@ -744,6 +766,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
             curr_operator_id, multiple_numa_node_id));
     ASSERT_TRUE(observed_work_order_multiple_numa != nullptr);
 
+    EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_multiple_numa =
         rebuild_workorders_multiple_numa_ids[curr_operator_id];
@@ -755,6 +778,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
     MockNUMAWorkOrder *observed_work_order_no_numa =
         static_cast<MockNUMAWorkOrder *>(w.getRebuildWorkOrder(curr_operator_id));
 
+    EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID());
     // Verify if the workorder ID is correct.
     const int expected_workorder_id_no_numa =
         rebuild_workorders_no_numa_ids[curr_operator_id];
@@ -772,6 +796,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) {
   numa_node_ids.push_back(0);
   const std::size_t kNumWorkOrdersPerType = 100;
 
+  const std::size_t query_id = 0;
   WorkOrdersContainer w(1, 2);
 
   std::vector<int> single_numa_node_workorder_ids;
@@ -820,6 +845,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) {
     MockNUMAWorkOrder *observed_work_order = static_cast<MockNUMAWorkOrder *>(
         w.getNormalWorkOrder(0, prefer_single_NUMA_node));
     ASSERT_TRUE(observed_work_order != nullptr);
+    EXPECT_EQ(query_id, observed_work_order->getQueryID());
     if (prefer_single_NUMA_node) {
       EXPECT_EQ(*single_numa_it, observed_work_order->getID());
       EXPECT_EQ(1u, observed_work_order->getPreferredNUMANodes().size());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 0630bca..c7fd018 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -105,6 +105,7 @@ class ExecutionGenerator {
         execution_plan_(DCHECK_NOTNULL(query_handle->getQueryPlanMutable())),
         query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())),
         execution_heuristics_(new ExecutionHeuristics()) {
+    query_context_proto_->set_query_id(query_handle_->query_id());
 #ifdef QUICKSTEP_DISTRIBUTED
     catalog_database_cache_proto_ = DCHECK_NOTNULL(query_handle->getCatalogDatabaseCacheProtoMutable());
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 6ef2a03..5b58f75 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -116,9 +116,11 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
                       quickstep_cli_PrintToScreen
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
+                      quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_Worker
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 56b53ba..ea871d0 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -24,7 +24,9 @@
 #include "cli/DropRelation.hpp"
 #include "cli/PrintToScreen.hpp"
 #include "parser/ParseStatement.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/Foreman.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_optimizer/ExecutionGenerator.hpp"
 #include "query_optimizer/LogicalGenerator.hpp"
@@ -40,6 +42,8 @@
 
 #include "glog/logging.h"
 
+#include "tmb/tagged_message.h"
+
 namespace quickstep {
 
 class CatalogRelation;
@@ -90,13 +94,18 @@ void ExecutionGeneratorTestRunner::runTestCase(
             physical_generator.generatePlan(
                 logical_generator.generatePlan(*result.parsed_statement));
         execution_generator.generatePlan(physical_plan);
-        foreman_->setQueryPlan(
-            query_handle.getQueryPlanMutable()->getQueryPlanDAGMutable());
-
-        foreman_->reconstructQueryContextFromProto(query_handle.getQueryContextProto());
 
-        foreman_->start();
-        foreman_->join();
+        QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+            main_thread_client_id_,
+            foreman_->getBusClientID(),
+            &query_handle,
+            &bus_);
+
+        // Receive workload completion message from Foreman.
+        const AnnotatedMessage annotated_msg =
+            bus_.Receive(main_thread_client_id_, 0, true);
+        const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+        DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
 
         const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
         if (query_result_relation) {