You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:35 UTC

[11/30] incubator-quickstep git commit: Refactored messages processing in both PolicyEnforcer and QueryManager.

Refactored messages processing in both PolicyEnforcer and QueryManager.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/db0e7e3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/db0e7e3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/db0e7e3c

Branch: refs/heads/quickstep-28-29
Commit: db0e7e3ccf4c2d631670a4dc0cd2499f9b0294b4
Parents: cf81b5e
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Jul 9 14:54:05 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Sat Jul 16 17:23:05 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  | 10 +--
 query_execution/PolicyEnforcer.cpp              | 61 ++++++++++---
 query_execution/PolicyEnforcer.hpp              | 10 ++-
 query_execution/QueryManagerBase.cpp            | 91 ++------------------
 query_execution/QueryManagerBase.hpp            | 90 ++++++++++---------
 query_execution/QueryManagerSingleNode.cpp      |  4 +-
 query_execution/QueryManagerSingleNode.hpp      |  4 +-
 .../tests/QueryManagerSingleNode_unittest.cpp   | 79 +++--------------
 8 files changed, 126 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 5a9189c..028531d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -91,8 +91,12 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       ${GFLAGS_LIB_NAME})
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       glog
+                      quickstep_catalog_CatalogDatabase
+                      quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionScheme
                       quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionState
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryManagerBase
                       quickstep_queryexecution_QueryManagerSingleNode
@@ -100,6 +104,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_relationaloperators_WorkOrder
+                      quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
                       tmb
                       ${GFLAGS_LIB_NAME})
@@ -152,14 +157,9 @@ target_link_libraries(quickstep_queryexecution_QueryExecutionUtil
                       quickstep_utility_Macros
                       tmb)
 target_link_libraries(quickstep_queryexecution_QueryManagerBase
-                      quickstep_catalog_CatalogDatabase
-                      quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_catalog_PartitionScheme
                       quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionState
-                      quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_queryoptimizer_QueryPlan
                       quickstep_relationaloperators_RelationalOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index f310ee1..4cba8c5 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -24,12 +24,19 @@
 #include <unordered_map>
 #include <vector>
 
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/QueryManagerSingleNode.hpp"
 #include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -62,10 +69,9 @@ bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
 }
 
 void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
-  // TODO(harshad) : Provide processXMessage() public functions in
-  // QueryManager, so that we need to extract message from the
-  // TaggedMessage only once.
   std::size_t query_id;
+  QueryManagerBase::dag_node_index op_index;
+
   switch (tagged_message.message_type()) {
     case kWorkOrderCompleteMessage: {
       serialization::NormalWorkOrderCompletionMessage proto;
@@ -73,12 +79,17 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
       // WorkOrder. It can be accessed in this scope.
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
-      query_id = proto.query_id();
       worker_directory_->decrementNumQueuedWorkOrders(
           proto.worker_thread_index());
       if (profile_individual_workorders_) {
         recordTimeForWorkOrder(proto);
       }
+
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
       break;
     }
     case kRebuildWorkOrderCompleteMessage: {
@@ -87,23 +98,43 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
       // rebuild WorkOrder. It can be accessed in this scope.
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
-      query_id = proto.query_id();
       worker_directory_->decrementNumQueuedWorkOrders(
           proto.worker_thread_index());
+
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
       break;
     }
     case kCatalogRelationNewBlockMessage: {
       serialization::CatalogRelationNewBlockMessage proto;
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
-      query_id = proto.query_id();
-      break;
+
+      const block_id block = proto.block_id();
+
+      CatalogRelation *relation =
+          static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
+      relation->addBlock(block);
+
+      if (proto.has_partition_id()) {
+        relation->getPartitionSchemeMutable()->addBlockToPartition(
+            proto.partition_id(), block);
+      }
+      return;
     }
     case kDataPipelineMessage: {
       serialization::DataPipelineMessage proto;
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      admitted_queries_[query_id]->processDataPipelineMessage(
+          op_index, proto.block_id(), proto.relation_id());
       break;
     }
     case kWorkOrdersAvailableMessage: {
@@ -111,6 +142,12 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+
+      // Check if new work orders are available.
+      admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
       break;
     }
     case kWorkOrderFeedbackMessage: {
@@ -118,15 +155,17 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
           const_cast<void *>(tagged_message.message()),
           tagged_message.message_bytes());
       query_id = msg.header().query_id;
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = msg.header().rel_op_index;
+      admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
       break;
     }
     default:
       LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
   }
-  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-  const QueryManagerBase::QueryStatusCode return_code =
-      admitted_queries_[query_id]->processMessage(tagged_message);
-  if (return_code == QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+  if (admitted_queries_[query_id]->queryStatus(op_index) ==
+          QueryManagerBase::QueryStatusCode::kQueryExecuted) {
     removeQuery(query_id);
     if (!waiting_queries_.empty()) {
       // Admit the earliest waiting query.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 79e61d1..8bd6d92 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -33,8 +33,8 @@
 #include "glog/logging.h"
 
 #include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
+
+namespace tmb { class MessageBus; }
 
 namespace quickstep {
 
@@ -43,6 +43,12 @@ class QueryHandle;
 class StorageManager;
 class WorkerDirectory;
 
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
 /**
  * @brief A class that ensures that a high level policy is maintained
  *        in sharing resources among concurrent queries.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index f7e183f..37beb02 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -21,13 +21,8 @@
 #include <utility>
 #include <vector>
 
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
-#include "catalog/PartitionScheme.hpp"
 #include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -39,10 +34,8 @@ using std::pair;
 
 namespace quickstep {
 
-QueryManagerBase::QueryManagerBase(QueryHandle *query_handle,
-                                   CatalogDatabaseLite *catalog_database)
+QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
     : query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
-      catalog_database_(DCHECK_NOTNULL(catalog_database)),
       query_dag_(DCHECK_NOTNULL(
           DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
       num_operators_in_dag_(query_dag_->size()),
@@ -76,82 +69,8 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle,
   }
 }
 
-QueryManagerBase::QueryStatusCode QueryManagerBase::processMessage(
-    const TaggedMessage &tagged_message) {
-  dag_node_index op_index;
-  switch (tagged_message.message_type()) {
-    case kWorkOrderCompleteMessage: {
-      serialization::NormalWorkOrderCompletionMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      op_index = proto.operator_index();
-      processWorkOrderCompleteMessage(proto.operator_index());
-      break;
-    }
-    case kRebuildWorkOrderCompleteMessage: {
-      serialization::RebuildWorkOrderCompletionMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      op_index = proto.operator_index();
-      processRebuildWorkOrderCompleteMessage(proto.operator_index());
-      break;
-    }
-    case kCatalogRelationNewBlockMessage: {
-      serialization::CatalogRelationNewBlockMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      const block_id block = proto.block_id();
-
-      CatalogRelation *relation =
-          static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
-      relation->addBlock(block);
-
-      if (proto.has_partition_id()) {
-        relation->getPartitionSchemeMutable()->addBlockToPartition(
-            proto.partition_id(), block);
-      }
-      return QueryStatusCode::kNone;
-    }
-    case kDataPipelineMessage: {
-      // Possible message senders include InsertDestinations and some
-      // operators which modify existing blocks.
-      serialization::DataPipelineMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      op_index = proto.operator_index();
-      processDataPipelineMessage(proto.operator_index(),
-                                 proto.block_id(),
-                                 proto.relation_id());
-      break;
-    }
-    case kWorkOrdersAvailableMessage: {
-      serialization::WorkOrdersAvailableMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      op_index = proto.operator_index();
-
-      // Check if new work orders are available.
-      fetchNormalWorkOrders(op_index);
-      break;
-    }
-    case kWorkOrderFeedbackMessage: {
-      WorkOrder::FeedbackMessage msg(
-          const_cast<void *>(tagged_message.message()),
-          tagged_message.message_bytes());
-
-      op_index = msg.header().rel_op_index;
-      processFeedbackMessage(msg);
-      break;
-    }
-    default:
-      LOG(FATAL) << "Unknown message type found in QueryManager";
-  }
-
+QueryManagerBase::QueryStatusCode QueryManagerBase::queryStatus(
+    const dag_node_index op_index) {
   if (query_exec_state_->hasExecutionFinished(op_index)) {
     return QueryStatusCode::kOperatorExecuted;
   }
@@ -165,9 +84,9 @@ QueryManagerBase::QueryStatusCode QueryManagerBase::processMessage(
 }
 
 void QueryManagerBase::processFeedbackMessage(
-    const WorkOrder::FeedbackMessage &msg) {
+    const dag_node_index op_index, const WorkOrder::FeedbackMessage &msg) {
   RelationalOperator *op =
-      query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
+      query_dag_->getNodePayloadMutable(op_index);
   op->receiveFeedbackMessage(msg);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 9e192c8..6edfd5c 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -24,7 +24,6 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -33,7 +32,6 @@
 
 namespace quickstep {
 
-class CatalogDatabaseLite;
 class QueryHandle;
 
 /** \addtogroup QueryExecution
@@ -50,7 +48,7 @@ class QueryManagerBase {
   typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
 
   /**
-   * @brief Return codes for processMessage() function.
+   * @brief Return codes for queryStatus() function.
    *
    * @note When both operator and query get executed, kQueryExecuted takes
    *       precedence over kOperatorExecuted.
@@ -65,10 +63,8 @@ class QueryManagerBase {
    * @brief Constructor.
    *
    * @param query_handle The QueryHandle object for this query.
-   * @param catalog_database The CatalogDatabse used by the query.
    **/
-  QueryManagerBase(QueryHandle *query_handle,
-                   CatalogDatabaseLite *catalog_database);
+  explicit QueryManagerBase(QueryHandle *query_handle);
 
   /**
    * @brief Virtual destructor.
@@ -76,26 +72,16 @@ class QueryManagerBase {
   virtual ~QueryManagerBase() {}
 
   /**
-   * @brief Process a message sent to the QueryManager.
-   *
-   * @param tagged_message TaggedMessage sent to the QueryManager.
-   *
-   * @return QueryStatusCode as determined after the message is processed.
-   **/
-  QueryStatusCode processMessage(const TaggedMessage &tagged_message);
-
-  /**
    * @brief Get the QueryExecutionState for this query.
    **/
   inline const QueryExecutionState& getQueryExecutionState() const {
     return *query_exec_state_;
   }
 
- protected:
   /**
    * @brief Process the received WorkOrder complete message.
    *
-   * @param node_index The index of the specified operator node in the query DAG
+   * @param op_index The index of the specified operator node in the query DAG
    *        for the completed WorkOrder.
    **/
   void processWorkOrderCompleteMessage(const dag_node_index op_index);
@@ -103,28 +89,15 @@ class QueryManagerBase {
   /**
    * @brief Process the received RebuildWorkOrder complete message.
    *
-   * @param node_index The index of the specified operator node in the query DAG
+   * @param op_index The index of the specified operator node in the query DAG
    *        for the completed RebuildWorkOrder.
    **/
   void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
 
   /**
-   * @brief Process a current relational operator: Get its workorders and store
-   *        them in the WorkOrdersContainer for this query. If the operator can
-   *        be marked as done, do so.
-   *
-   * @param index The index of the relational operator to be processed in the
-   *        query plan DAG.
-   * @param recursively_check_dependents If an operator is done, should we
-   *        call processOperator on its dependents recursively.
-   **/
-  void processOperator(const dag_node_index index,
-                       const bool recursively_check_dependents);
-
-  /**
    * @brief Process the received data pipeline message.
    *
-   * @param node_index The index of the specified operator node in the query DAG
+   * @param op_index The index of the specified operator node in the query DAG
    *        for the pipelining block.
    * @param block The block id.
    * @param rel_id The ID of the relation that produced 'block'.
@@ -134,12 +107,50 @@ class QueryManagerBase {
                                   const relation_id rel_id);
 
   /**
+   * @brief Fetch all work orders currently available in relational operator and
+   *        store them internally.
+   *
+   * @param index The index of the relational operator to be processed in the
+   *        query plan DAG.
+   *
+   * @return Whether any work order was generated by op.
+   **/
+  virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0;
+
+  /**
    * @brief Process the received work order feedback message and notify
    *        relational operator.
    *
+   * @param op_index The index of the specified operator node in the query DAG
+   *        for the feedback message.
    * @param message Feedback message from work order.
    **/
-  void processFeedbackMessage(const WorkOrder::FeedbackMessage &message);
+  void processFeedbackMessage(const dag_node_index op_index,
+                              const WorkOrder::FeedbackMessage &message);
+
+  /**
+   * @brief Get the query status after processing an incoming message.
+   *
+   * @param op_index The index of the specified operator node in the query DAG
+   *        for the incoming message.
+   *
+   * @return QueryStatusCode as determined after the message is processed.
+   **/
+  QueryStatusCode queryStatus(const dag_node_index op_index);
+
+ protected:
+  /**
+   * @brief Process a current relational operator: Get its workorders and store
+   *        them in the WorkOrdersContainer for this query. If the operator can
+   *        be marked as done, do so.
+   *
+   * @param index The index of the relational operator to be processed in the
+   *        query plan DAG.
+   * @param recursively_check_dependents If an operator is done, should we
+   *        call processOperator on its dependents recursively.
+   **/
+  void processOperator(const dag_node_index index,
+                       const bool recursively_check_dependents);
 
   /**
    * @brief This function does the following things:
@@ -241,8 +252,6 @@ class QueryManagerBase {
 
   const std::size_t query_id_;
 
-  CatalogDatabaseLite *catalog_database_;
-
   DAG<RelationalOperator, bool> *query_dag_;
   const dag_node_index num_operators_in_dag_;
 
@@ -256,17 +265,6 @@ class QueryManagerBase {
 
  private:
   /**
-   * @brief Fetch all work orders currently available in relational operator and
-   *        store them internally.
-   *
-   * @param index The index of the relational operator to be processed in the
-   *        query plan DAG.
-   *
-   * @return Whether any work order was generated by op.
-   **/
-  virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0;
-
-  /**
    * @brief Check if the given operator's normal execution is over.
    *
    * @note The conditions for a given operator's normal execution to get over:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 193b188..12f8ff5 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -46,12 +46,12 @@ QueryManagerSingleNode::QueryManagerSingleNode(
     CatalogDatabaseLite *catalog_database,
     StorageManager *storage_manager,
     tmb::MessageBus *bus)
-    : QueryManagerBase(query_handle, catalog_database),
+    : QueryManagerBase(query_handle),
       foreman_client_id_(foreman_client_id),
       storage_manager_(DCHECK_NOTNULL(storage_manager)),
       bus_(DCHECK_NOTNULL(bus)),
       query_context_(new QueryContext(query_handle->getQueryContextProto(),
-                                      *catalog_database_,
+                                      *catalog_database,
                                       storage_manager_,
                                       foreman_client_id_,
                                       bus_)),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index 5533f06..b9bad2e 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -68,6 +68,8 @@ class QueryManagerSingleNode final : public QueryManagerBase {
 
   ~QueryManagerSingleNode() override {}
 
+  bool fetchNormalWorkOrders(const dag_node_index index) override;
+
  /**
    * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
    *
@@ -91,8 +93,6 @@ class QueryManagerSingleNode final : public QueryManagerBase {
   }
 
  private:
-  bool fetchNormalWorkOrders(const dag_node_index index) override;
-
   bool checkNormalExecutionOver(const dag_node_index index) const override {
     return (checkAllDependenciesMet(index) &&
             !workorders_container_->hasNormalWorkOrder(index) &&

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index 52cee20..39ca58c 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -16,7 +16,6 @@
  **/
 
 #include <climits>
-#include <cstdlib>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -26,7 +25,6 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
-#include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionState.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryManagerSingleNode.hpp"
@@ -49,7 +47,6 @@
 #include "gtest/gtest.h"
 
 #include "tmb/id_typedefs.h"
-#include "tmb/tagged_message.h"
 
 namespace tmb { class MessageBus; }
 
@@ -254,89 +251,33 @@ class QueryManagerTest : public ::testing::Test {
 
   inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
     VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
-    serialization::DataPipelineMessage proto;
-    proto.set_operator_index(source_operator_index);
-
-    proto.set_block_id(0);  // dummy block ID
-    proto.set_relation_id(0);  // dummy relation ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const std::size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
-                                      proto_length,
-                                      kDataPipelineMessage);
-    std::free(proto_bytes);
-    query_manager_->processMessage(tagged_message);
+
+    query_manager_->processDataPipelineMessage(source_operator_index,
+                                               0 /* dummy block ID */,
+                                               0 /* dummy relation ID */);
     return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
   }
 
   inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
     VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
-    TaggedMessage tagged_message;
-    serialization::NormalWorkOrderCompletionMessage proto;
-    proto.set_operator_index(index);
-    proto.set_worker_thread_index(1);  // dummy worker ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kWorkOrderCompleteMessage);
-    std::free(proto_bytes);
-    query_manager_->processMessage(message);
 
+    query_manager_->processWorkOrderCompleteMessage(index);
     return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
   }
 
   inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
     VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
-    serialization::RebuildWorkOrderCompletionMessage proto;
-    proto.set_operator_index(index);
-    proto.set_worker_thread_index(1);  // dummy worker thread ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kRebuildWorkOrderCompleteMessage);
-
-    std::free(proto_bytes);
-    query_manager_->processMessage(message);
 
+    query_manager_->processRebuildWorkOrderCompleteMessage(index);
     return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
   }
 
   inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
     VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
-    serialization::DataPipelineMessage proto;
-    proto.set_operator_index(index);
-
-    proto.set_block_id(0);  // dummy block ID
-    proto.set_relation_id(0);  // dummy relation ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const std::size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
-                                      proto_length,
-                                      kDataPipelineMessage);
-    std::free(proto_bytes);
-    query_manager_->processMessage(tagged_message);
+
+    query_manager_->processDataPipelineMessage(index,
+                                               0 /* dummy block ID */,
+                                               0 /* dummy relation ID */);
     return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
   }