You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/14 19:05:07 UTC
incubator-quickstep git commit: Measure execution time of normal
WorkOrders.
Repository: incubator-quickstep
Updated Branches:
refs/heads/record-wo-execution-time [created] 7e7695663
Measure execution time of normal WorkOrders.
- Measure normal work order execution time
- Split the WorkOrderCompletion proto message in two: Normal and Rebuild
work orders.
- Add execution time field in the NormalWorkOrderComplete proto message.
- Include the recorded time in NormalWorkOrderComplete proto message
that is sent back to Foreman.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7e769566
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7e769566
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7e769566
Branch: refs/heads/record-wo-execution-time
Commit: 7e7695663e61e60c2bed798ac3ff0f59d57db4ea
Parents: 29768a7
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 14 14:02:45 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jun 14 14:02:45 2016 -0500
----------------------------------------------------------------------
query_execution/PolicyEnforcer.cpp | 10 ++-
query_execution/QueryExecutionMessages.proto | 15 ++--
query_execution/QueryManager.cpp | 4 +-
query_execution/Worker.cpp | 93 ++++++++++++++------
query_execution/Worker.hpp | 40 +++++++--
query_execution/tests/QueryManager_unittest.cpp | 9 +-
6 files changed, 123 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e769566/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 2145429..4309cb8 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -66,9 +66,15 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
// TaggedMessage only once.
std::size_t query_id;
switch (tagged_message.message_type()) {
- case kWorkOrderCompleteMessage: // Fall through.
+ case kWorkOrderCompleteMessage: {
+ serialization::NormalWorkOrderCompletionMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
case kRebuildWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
+ serialization::RebuildWorkOrderCompletionMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e769566/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 9d9a9e5..3523694 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -20,11 +20,16 @@ package quickstep.serialization;
message EmptyMessage {
}
-// Used for both Normal WorkOrders and RebuildWorkOrders.
-// NOTE(zuyu): we might need to seperate the completion messages to contain
-// run-time information for Foreman to make better decisions on scheduling
-// WorkOrders.
-message WorkOrderCompletionMessage {
+// A message sent upon completion of a normal (not rebuild) WorkOrder execution.
+message NormalWorkOrderCompletionMessage {
+ required uint64 operator_index = 1;
+ required uint64 worker_thread_index = 2;
+ required uint64 query_id = 3;
+ optional uint64 execution_time_us = 4; // Execution time in micro seconds.
+}
+
+// A message sent upon completion of a rebuild WorkOrder execution.
+message RebuildWorkOrderCompletionMessage {
required uint64 operator_index = 1;
required uint64 worker_thread_index = 2;
required uint64 query_id = 3;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e769566/query_execution/QueryManager.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.cpp b/query_execution/QueryManager.cpp
index e4e4c9d..d20b592 100644
--- a/query_execution/QueryManager.cpp
+++ b/query_execution/QueryManager.cpp
@@ -161,7 +161,7 @@ QueryManager::QueryStatusCode QueryManager::processMessage(
dag_node_index op_index;
switch (tagged_message.message_type()) {
case kWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
+ serialization::NormalWorkOrderCompletionMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
@@ -170,7 +170,7 @@ QueryManager::QueryStatusCode QueryManager::processMessage(
break;
}
case kRebuildWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
+ serialization::RebuildWorkOrderCompletionMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e769566/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index ef596e1..d327cd2 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -16,7 +16,9 @@
#include "query_execution/Worker.hpp"
+#include <chrono>
#include <cstddef>
+#include <cstdint>
#include <cstdlib>
#include <utility>
@@ -47,13 +49,32 @@ void Worker::run() {
}
ClientIDMap *thread_id_map = ClientIDMap::Instance();
thread_id_map->addValue(worker_client_id_);
+ std::chrono::time_point<std::chrono::steady_clock> start, end;
for (;;) {
// Receive() is a blocking call, causing this thread to sleep until next
// message is received.
const AnnotatedMessage annotated_msg = bus_->Receive(worker_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
switch (tagged_message.message_type()) {
- case kWorkOrderMessage: // Fall through.
+ case kWorkOrderMessage: {
+ WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
+ DCHECK(message.getWorkOrder() != nullptr);
+ // Start measuring the execution time.
+ start = std::chrono::steady_clock::now();
+ message.getWorkOrder()->execute();
+ end = std::chrono::steady_clock::now();
+ const std::size_t query_id_for_workorder =
+ message.getWorkOrder()->getQueryID();
+ delete message.getWorkOrder();
+ const uint64_t time_us =
+ std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
+ .count();
+ sendNormalWorkOrderCompleteMessage(annotated_msg.sender,
+ message.getRelationalOpIndex(),
+ query_id_for_workorder,
+ time_us);
+ break;
+ }
case kRebuildWorkOrderMessage: {
WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
DCHECK(message.getWorkOrder() != nullptr);
@@ -62,10 +83,9 @@ void Worker::run() {
message.getWorkOrder()->getQueryID();
delete message.getWorkOrder();
- sendWorkOrderCompleteMessage(
- annotated_msg.sender, message.getRelationalOpIndex(),
- query_id_for_workorder,
- tagged_message.message_type() == kRebuildWorkOrderMessage);
+ sendRebuildWorkOrderCompleteMessage(annotated_msg.sender,
+ message.getRelationalOpIndex(),
+ query_id_for_workorder);
break;
}
case kPoisonMessage: {
@@ -77,34 +97,57 @@ void Worker::run() {
}
}
-void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
- const size_t op_index,
- const size_t query_id,
- const bool is_rebuild_work_order) {
- serialization::WorkOrderCompletionMessage proto;
- proto.set_operator_index(op_index);
- proto.set_worker_thread_index(worker_thread_index_);
- proto.set_query_id(query_id);
-
+template <typename CompletionMessageProtoT>
+void Worker::sendWorkOrderCompleteMessageHelper(
+ const CompletionMessageProtoT &proto,
+ const tmb::client_id receiver,
+ const message_type_id message_type) {
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+ char *proto_bytes = static_cast<char *>(std::malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage message(static_cast<const void*>(proto_bytes),
+ TaggedMessage message(static_cast<const void *>(proto_bytes),
proto_length,
- is_rebuild_work_order ? kRebuildWorkOrderCompleteMessage
- : kWorkOrderCompleteMessage);
+ message_type);
std::free(proto_bytes);
const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- worker_client_id_,
- receiver,
- std::move(message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not "
- "be sent from worker with TMB client ID " << worker_client_id_ << " to "
- "Foreman with TMB client ID " << receiver;
+ QueryExecutionUtil::SendTMBMessage(
+ bus_, worker_client_id_, receiver, std::move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from worker with TMB client ID "
+ << worker_client_id_ << " to Foreman with TMB client ID " << receiver;
+}
+
+void Worker::sendNormalWorkOrderCompleteMessage(
+ const tmb::client_id receiver,
+ const size_t op_index,
+ const size_t query_id,
+ const uint64_t execution_time_us) {
+ serialization::NormalWorkOrderCompletionMessage proto;
+ proto.set_operator_index(op_index);
+ proto.set_worker_thread_index(worker_thread_index_);
+ proto.set_query_id(query_id);
+ proto.set_execution_time_us(execution_time_us);
+
+ sendWorkOrderCompleteMessageHelper<
+ serialization::NormalWorkOrderCompletionMessage>(
+ proto, receiver, kWorkOrderCompleteMessage);
+}
+
+void Worker::sendRebuildWorkOrderCompleteMessage(
+ const tmb::client_id receiver,
+ const size_t op_index,
+ const size_t query_id) {
+ serialization::RebuildWorkOrderCompletionMessage proto;
+ proto.set_operator_index(op_index);
+ proto.set_worker_thread_index(worker_thread_index_);
+ proto.set_query_id(query_id);
+
+ sendWorkOrderCompleteMessageHelper<
+ serialization::RebuildWorkOrderCompletionMessage>(
+ proto, receiver, kRebuildWorkOrderCompleteMessage);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e769566/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index c0bafdc..1fea2ea 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -19,6 +19,7 @@
#define QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_
#include <cstddef>
+#include <cstdint>
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "threading/Thread.hpp"
@@ -93,18 +94,43 @@ class Worker : public Thread {
private:
/**
- * @brief Send the response WorkOrder completion message.
+ * @brief Construct a normal WorkOrder completion response message.
*
* @param receiver The id of the TMB client which should receive the response.
* @param op_index The index of the operator to which the WorkOrder belongs.
* @param query_id The ID of the query which the WorkOrder belongs to.
- * @param is_rebuild_work_order True if it is a RebuildWorkOrder. Otherwise
- * false.
+ * @param execution_time_us The execution time of the WorkOrder in
+ * microseconds.
**/
- void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
- const std::size_t op_index,
- const std::size_t query_id,
- const bool is_rebuild_work_order);
+ void sendNormalWorkOrderCompleteMessage(const tmb::client_id receiver,
+ const size_t op_index,
+ const size_t query_id,
+ const uint64_t execution_time_us);
+
+ /**
+ * @brief Construct a rebuild WorkOrder completion response message.
+ *
+ * @param receiver The id of the TMB client which should receive the response.
+ * @param op_index The index of the operator to which the WorkOrder belongs.
+ * @param query_id The ID of the query which the WorkOrder belongs to.
+ **/
+ void sendRebuildWorkOrderCompleteMessage(const tmb::client_id receiver,
+ const size_t op_index,
+ const size_t query_id);
+
+ /**
+ * @brief A helper method to send the WorkOrder completion message.
+ *
+ * @note CompletionMessageProtoT is the type of the completion message.
+ *
+ * @param proto The proto message to be sent.
+ * @param receiver The TMB client ID of the receiver.
+ * @param message_type The ID of the type of the message being sent.
+ **/
+ template <typename CompletionMessageProtoT>
+ void sendWorkOrderCompleteMessageHelper(const CompletionMessageProtoT &proto,
+ const tmb::client_id receiver,
+ const message_type_id message_type);
const std::size_t worker_thread_index_;
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e769566/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
index 4f98748..37e2cdd 100644
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ b/query_execution/tests/QueryManager_unittest.cpp
@@ -275,7 +275,7 @@ class QueryManagerTest : public ::testing::Test {
inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
TaggedMessage tagged_message;
- serialization::WorkOrderCompletionMessage proto;
+ serialization::NormalWorkOrderCompletionMessage proto;
proto.set_operator_index(index);
proto.set_worker_thread_index(1); // dummy worker ID.
proto.set_query_id(0); // dummy query ID.
@@ -296,8 +296,7 @@ class QueryManagerTest : public ::testing::Test {
inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
- // foreman_->processRebuildWorkOrderCompleteMessage(index, 0 /* worker id */);
- serialization::WorkOrderCompletionMessage proto;
+ serialization::RebuildWorkOrderCompletionMessage proto;
proto.set_operator_index(index);
proto.set_worker_thread_index(1); // dummy worker thread ID.
proto.set_query_id(0); // dummy query ID.
@@ -346,7 +345,6 @@ class QueryManagerTest : public ::testing::Test {
unique_ptr<QueryHandle> query_handle_;
unique_ptr<QueryManager> query_manager_;
- // unique_ptr<Foreman> foreman_;
MessageBusImpl bus_;
client_id worker_client_id_;
@@ -357,7 +355,6 @@ class QueryManagerTest : public ::testing::Test {
TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
// This test creates a DAG of a single node. No workorders are generated.
query_plan_->addRelationalOperator(new MockOperator(false, false));
- // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
const MockOperator &op = static_cast<const MockOperator &>(
query_plan_->getQueryPlanDAG().getNodePayload(0));
@@ -377,7 +374,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
// This test creates a DAG of a single node. Static workorders are generated.
const QueryPlan::DAGNodeIndex id =
query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
- // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
const MockOperator &op = static_cast<const MockOperator &>(
query_plan_->getQueryPlanDAG().getNodePayload(id));
@@ -429,7 +425,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
// scaffolding of mocking. If we use gMock, we can do much better.
const QueryPlan::DAGNodeIndex id =
query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
- // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
const MockOperator &op = static_cast<const MockOperator &>(
query_plan_->getQueryPlanDAG().getNodePayload(id));