You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:35 UTC
[11/30] incubator-quickstep git commit: Refactored messages
processing in both PolicyEnforcer and QueryManager.
Refactored messages processing in both PolicyEnforcer and QueryManager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/db0e7e3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/db0e7e3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/db0e7e3c
Branch: refs/heads/quickstep-28-29
Commit: db0e7e3ccf4c2d631670a4dc0cd2499f9b0294b4
Parents: cf81b5e
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Jul 9 14:54:05 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Sat Jul 16 17:23:05 2016 -0700
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 10 +--
query_execution/PolicyEnforcer.cpp | 61 ++++++++++---
query_execution/PolicyEnforcer.hpp | 10 ++-
query_execution/QueryManagerBase.cpp | 91 ++------------------
query_execution/QueryManagerBase.hpp | 90 ++++++++++---------
query_execution/QueryManagerSingleNode.cpp | 4 +-
query_execution/QueryManagerSingleNode.hpp | 4 +-
.../tests/QueryManagerSingleNode_unittest.cpp | 79 +++--------------
8 files changed, 126 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 5a9189c..028531d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -91,8 +91,12 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
${GFLAGS_LIB_NAME})
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
glog
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionScheme
quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionState
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryManagerBase
quickstep_queryexecution_QueryManagerSingleNode
@@ -100,6 +104,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_WorkerMessage
quickstep_queryoptimizer_QueryHandle
quickstep_relationaloperators_WorkOrder
+ quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
tmb
${GFLAGS_LIB_NAME})
@@ -152,14 +157,9 @@ target_link_libraries(quickstep_queryexecution_QueryExecutionUtil
quickstep_utility_Macros
tmb)
target_link_libraries(quickstep_queryexecution_QueryManagerBase
- quickstep_catalog_CatalogDatabase
- quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
- quickstep_catalog_PartitionScheme
quickstep_queryexecution_QueryContext
- quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionState
- quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_QueryPlan
quickstep_relationaloperators_RelationalOperator
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index f310ee1..4cba8c5 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -24,12 +24,19 @@
#include <unordered_map>
#include <vector>
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/QueryManagerSingleNode.hpp"
#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -62,10 +69,9 @@ bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
}
void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
- // TODO(harshad) : Provide processXMessage() public functions in
- // QueryManager, so that we need to extract message from the
- // TaggedMessage only once.
std::size_t query_id;
+ QueryManagerBase::dag_node_index op_index;
+
switch (tagged_message.message_type()) {
case kWorkOrderCompleteMessage: {
serialization::NormalWorkOrderCompletionMessage proto;
@@ -73,12 +79,17 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
// WorkOrder. It can be accessed in this scope.
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
- query_id = proto.query_id();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
if (profile_individual_workorders_) {
recordTimeForWorkOrder(proto);
}
+
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
break;
}
case kRebuildWorkOrderCompleteMessage: {
@@ -87,23 +98,43 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
// rebuild WorkOrder. It can be accessed in this scope.
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
- query_id = proto.query_id();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
+
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
break;
}
case kCatalogRelationNewBlockMessage: {
serialization::CatalogRelationNewBlockMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
- query_id = proto.query_id();
- break;
+
+ const block_id block = proto.block_id();
+
+ CatalogRelation *relation =
+ static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
+ relation->addBlock(block);
+
+ if (proto.has_partition_id()) {
+ relation->getPartitionSchemeMutable()->addBlockToPartition(
+ proto.partition_id(), block);
+ }
+ return;
}
case kDataPipelineMessage: {
serialization::DataPipelineMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ admitted_queries_[query_id]->processDataPipelineMessage(
+ op_index, proto.block_id(), proto.relation_id());
break;
}
case kWorkOrdersAvailableMessage: {
@@ -111,6 +142,12 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+
+ // Check if new work orders are available.
+ admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
break;
}
case kWorkOrderFeedbackMessage: {
@@ -118,15 +155,17 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
const_cast<void *>(tagged_message.message()),
tagged_message.message_bytes());
query_id = msg.header().query_id;
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = msg.header().rel_op_index;
+ admitted_queries_[query_id]->processFeedbackMessage(op_index, msg);
break;
}
default:
LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
}
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
- const QueryManagerBase::QueryStatusCode return_code =
- admitted_queries_[query_id]->processMessage(tagged_message);
- if (return_code == QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+ if (admitted_queries_[query_id]->queryStatus(op_index) ==
+ QueryManagerBase::QueryStatusCode::kQueryExecuted) {
removeQuery(query_id);
if (!waiting_queries_.empty()) {
// Admit the earliest waiting query.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 79e61d1..8bd6d92 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -33,8 +33,8 @@
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
+
+namespace tmb { class MessageBus; }
namespace quickstep {
@@ -43,6 +43,12 @@ class QueryHandle;
class StorageManager;
class WorkerDirectory;
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
/**
* @brief A class that ensures that a high level policy is maintained
* in sharing resources among concurrent queries.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index f7e183f..37beb02 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -21,13 +21,8 @@
#include <utility>
#include <vector>
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
-#include "catalog/PartitionScheme.hpp"
#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -39,10 +34,8 @@ using std::pair;
namespace quickstep {
-QueryManagerBase::QueryManagerBase(QueryHandle *query_handle,
- CatalogDatabaseLite *catalog_database)
+QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
: query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
- catalog_database_(DCHECK_NOTNULL(catalog_database)),
query_dag_(DCHECK_NOTNULL(
DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
num_operators_in_dag_(query_dag_->size()),
@@ -76,82 +69,8 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle,
}
}
-QueryManagerBase::QueryStatusCode QueryManagerBase::processMessage(
- const TaggedMessage &tagged_message) {
- dag_node_index op_index;
- switch (tagged_message.message_type()) {
- case kWorkOrderCompleteMessage: {
- serialization::NormalWorkOrderCompletionMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- op_index = proto.operator_index();
- processWorkOrderCompleteMessage(proto.operator_index());
- break;
- }
- case kRebuildWorkOrderCompleteMessage: {
- serialization::RebuildWorkOrderCompletionMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- op_index = proto.operator_index();
- processRebuildWorkOrderCompleteMessage(proto.operator_index());
- break;
- }
- case kCatalogRelationNewBlockMessage: {
- serialization::CatalogRelationNewBlockMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- const block_id block = proto.block_id();
-
- CatalogRelation *relation =
- static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
- relation->addBlock(block);
-
- if (proto.has_partition_id()) {
- relation->getPartitionSchemeMutable()->addBlockToPartition(
- proto.partition_id(), block);
- }
- return QueryStatusCode::kNone;
- }
- case kDataPipelineMessage: {
- // Possible message senders include InsertDestinations and some
- // operators which modify existing blocks.
- serialization::DataPipelineMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- op_index = proto.operator_index();
- processDataPipelineMessage(proto.operator_index(),
- proto.block_id(),
- proto.relation_id());
- break;
- }
- case kWorkOrdersAvailableMessage: {
- serialization::WorkOrdersAvailableMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- op_index = proto.operator_index();
-
- // Check if new work orders are available.
- fetchNormalWorkOrders(op_index);
- break;
- }
- case kWorkOrderFeedbackMessage: {
- WorkOrder::FeedbackMessage msg(
- const_cast<void *>(tagged_message.message()),
- tagged_message.message_bytes());
-
- op_index = msg.header().rel_op_index;
- processFeedbackMessage(msg);
- break;
- }
- default:
- LOG(FATAL) << "Unknown message type found in QueryManager";
- }
-
+QueryManagerBase::QueryStatusCode QueryManagerBase::queryStatus(
+ const dag_node_index op_index) {
if (query_exec_state_->hasExecutionFinished(op_index)) {
return QueryStatusCode::kOperatorExecuted;
}
@@ -165,9 +84,9 @@ QueryManagerBase::QueryStatusCode QueryManagerBase::processMessage(
}
void QueryManagerBase::processFeedbackMessage(
- const WorkOrder::FeedbackMessage &msg) {
+ const dag_node_index op_index, const WorkOrder::FeedbackMessage &msg) {
RelationalOperator *op =
- query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
+ query_dag_->getNodePayloadMutable(op_index);
op->receiveFeedbackMessage(msg);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 9e192c8..6edfd5c 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -24,7 +24,6 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
@@ -33,7 +32,6 @@
namespace quickstep {
-class CatalogDatabaseLite;
class QueryHandle;
/** \addtogroup QueryExecution
@@ -50,7 +48,7 @@ class QueryManagerBase {
typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
/**
- * @brief Return codes for processMessage() function.
+ * @brief Return codes for queryStatus() function.
*
* @note When both operator and query get executed, kQueryExecuted takes
* precedence over kOperatorExecuted.
@@ -65,10 +63,8 @@ class QueryManagerBase {
* @brief Constructor.
*
* @param query_handle The QueryHandle object for this query.
- * @param catalog_database The CatalogDatabse used by the query.
**/
- QueryManagerBase(QueryHandle *query_handle,
- CatalogDatabaseLite *catalog_database);
+ explicit QueryManagerBase(QueryHandle *query_handle);
/**
* @brief Virtual destructor.
@@ -76,26 +72,16 @@ class QueryManagerBase {
virtual ~QueryManagerBase() {}
/**
- * @brief Process a message sent to the QueryManager.
- *
- * @param tagged_message TaggedMessage sent to the QueryManager.
- *
- * @return QueryStatusCode as determined after the message is processed.
- **/
- QueryStatusCode processMessage(const TaggedMessage &tagged_message);
-
- /**
* @brief Get the QueryExecutionState for this query.
**/
inline const QueryExecutionState& getQueryExecutionState() const {
return *query_exec_state_;
}
- protected:
/**
* @brief Process the received WorkOrder complete message.
*
- * @param node_index The index of the specified operator node in the query DAG
+ * @param op_index The index of the specified operator node in the query DAG
* for the completed WorkOrder.
**/
void processWorkOrderCompleteMessage(const dag_node_index op_index);
@@ -103,28 +89,15 @@ class QueryManagerBase {
/**
* @brief Process the received RebuildWorkOrder complete message.
*
- * @param node_index The index of the specified operator node in the query DAG
+ * @param op_index The index of the specified operator node in the query DAG
* for the completed RebuildWorkOrder.
**/
void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
/**
- * @brief Process a current relational operator: Get its workorders and store
- * them in the WorkOrdersContainer for this query. If the operator can
- * be marked as done, do so.
- *
- * @param index The index of the relational operator to be processed in the
- * query plan DAG.
- * @param recursively_check_dependents If an operator is done, should we
- * call processOperator on its dependents recursively.
- **/
- void processOperator(const dag_node_index index,
- const bool recursively_check_dependents);
-
- /**
* @brief Process the received data pipeline message.
*
- * @param node_index The index of the specified operator node in the query DAG
+ * @param op_index The index of the specified operator node in the query DAG
* for the pipelining block.
* @param block The block id.
* @param rel_id The ID of the relation that produced 'block'.
@@ -134,12 +107,50 @@ class QueryManagerBase {
const relation_id rel_id);
/**
+ * @brief Fetch all work orders currently available in relational operator and
+ * store them internally.
+ *
+ * @param index The index of the relational operator to be processed in the
+ * query plan DAG.
+ *
+ * @return Whether any work order was generated by op.
+ **/
+ virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0;
+
+ /**
* @brief Process the received work order feedback message and notify
* relational operator.
*
+ * @param op_index The index of the specified operator node in the query DAG
+ * for the feedback message.
* @param message Feedback message from work order.
**/
- void processFeedbackMessage(const WorkOrder::FeedbackMessage &message);
+ void processFeedbackMessage(const dag_node_index op_index,
+ const WorkOrder::FeedbackMessage &message);
+
+ /**
+ * @brief Get the query status after processing an incoming message.
+ *
+ * @param op_index The index of the specified operator node in the query DAG
+ * for the incoming message.
+ *
+ * @return QueryStatusCode as determined after the message is processed.
+ **/
+ QueryStatusCode queryStatus(const dag_node_index op_index);
+
+ protected:
+ /**
+ * @brief Process a current relational operator: Get its workorders and store
+ * them in the WorkOrdersContainer for this query. If the operator can
+ * be marked as done, do so.
+ *
+ * @param index The index of the relational operator to be processed in the
+ * query plan DAG.
+ * @param recursively_check_dependents If an operator is done, should we
+ * call processOperator on its dependents recursively.
+ **/
+ void processOperator(const dag_node_index index,
+ const bool recursively_check_dependents);
/**
* @brief This function does the following things:
@@ -241,8 +252,6 @@ class QueryManagerBase {
const std::size_t query_id_;
- CatalogDatabaseLite *catalog_database_;
-
DAG<RelationalOperator, bool> *query_dag_;
const dag_node_index num_operators_in_dag_;
@@ -256,17 +265,6 @@ class QueryManagerBase {
private:
/**
- * @brief Fetch all work orders currently available in relational operator and
- * store them internally.
- *
- * @param index The index of the relational operator to be processed in the
- * query plan DAG.
- *
- * @return Whether any work order was generated by op.
- **/
- virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0;
-
- /**
* @brief Check if the given operator's normal execution is over.
*
* @note The conditions for a given operator's normal execution to get over:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 193b188..12f8ff5 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -46,12 +46,12 @@ QueryManagerSingleNode::QueryManagerSingleNode(
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
tmb::MessageBus *bus)
- : QueryManagerBase(query_handle, catalog_database),
+ : QueryManagerBase(query_handle),
foreman_client_id_(foreman_client_id),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
bus_(DCHECK_NOTNULL(bus)),
query_context_(new QueryContext(query_handle->getQueryContextProto(),
- *catalog_database_,
+ *catalog_database,
storage_manager_,
foreman_client_id_,
bus_)),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index 5533f06..b9bad2e 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -68,6 +68,8 @@ class QueryManagerSingleNode final : public QueryManagerBase {
~QueryManagerSingleNode() override {}
+ bool fetchNormalWorkOrders(const dag_node_index index) override;
+
/**
* @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
*
@@ -91,8 +93,6 @@ class QueryManagerSingleNode final : public QueryManagerBase {
}
private:
- bool fetchNormalWorkOrders(const dag_node_index index) override;
-
bool checkNormalExecutionOver(const dag_node_index index) const override {
return (checkAllDependenciesMet(index) &&
!workorders_container_->hasNormalWorkOrder(index) &&
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/db0e7e3c/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index 52cee20..39ca58c 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -16,7 +16,6 @@
**/
#include <climits>
-#include <cstdlib>
#include <memory>
#include <utility>
#include <vector>
@@ -26,7 +25,6 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryContext.pb.h"
-#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionState.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryManagerSingleNode.hpp"
@@ -49,7 +47,6 @@
#include "gtest/gtest.h"
#include "tmb/id_typedefs.h"
-#include "tmb/tagged_message.h"
namespace tmb { class MessageBus; }
@@ -254,89 +251,33 @@ class QueryManagerTest : public ::testing::Test {
inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
- serialization::DataPipelineMessage proto;
- proto.set_operator_index(source_operator_index);
-
- proto.set_block_id(0); // dummy block ID
- proto.set_relation_id(0); // dummy relation ID.
- proto.set_query_id(0); // dummy query ID.
-
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const std::size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
- proto_length,
- kDataPipelineMessage);
- std::free(proto_bytes);
- query_manager_->processMessage(tagged_message);
+
+ query_manager_->processDataPipelineMessage(source_operator_index,
+ 0 /* dummy block ID */,
+ 0 /* dummy relation ID */);
return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
}
inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
- TaggedMessage tagged_message;
- serialization::NormalWorkOrderCompletionMessage proto;
- proto.set_operator_index(index);
- proto.set_worker_thread_index(1); // dummy worker ID.
- proto.set_query_id(0); // dummy query ID.
-
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kWorkOrderCompleteMessage);
- std::free(proto_bytes);
- query_manager_->processMessage(message);
+ query_manager_->processWorkOrderCompleteMessage(index);
return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
}
inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
- serialization::RebuildWorkOrderCompletionMessage proto;
- proto.set_operator_index(index);
- proto.set_worker_thread_index(1); // dummy worker thread ID.
- proto.set_query_id(0); // dummy query ID.
-
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kRebuildWorkOrderCompleteMessage);
-
- std::free(proto_bytes);
- query_manager_->processMessage(message);
+ query_manager_->processRebuildWorkOrderCompleteMessage(index);
return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
}
inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
- serialization::DataPipelineMessage proto;
- proto.set_operator_index(index);
-
- proto.set_block_id(0); // dummy block ID
- proto.set_relation_id(0); // dummy relation ID.
- proto.set_query_id(0); // dummy query ID.
-
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const std::size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
- proto_length,
- kDataPipelineMessage);
- std::free(proto_bytes);
- query_manager_->processMessage(tagged_message);
+
+ query_manager_->processDataPipelineMessage(index,
+ 0 /* dummy block ID */,
+ 0 /* dummy relation ID */);
return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
}