You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/06/27 21:33:37 UTC
[02/18] incubator-quickstep git commit: Measure execution time of
WorkOrders.
Measure execution time of WorkOrders.
- Measure order execution time for both normal and rebuild work
orders.
- Split the WorkOrderCompletion proto message in two: Normal and Rebuild
work orders, with a possibility that in the future the two messages
will have different fields.
- Add execution time field in both WorkOrderComplete proto messages.
- Include the recorded time in both WorkOrderComplete proto messages
that are sent back to Foreman.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8e825f15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8e825f15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8e825f15
Branch: refs/heads/travis-grpc
Commit: 8e825f1525d5080e6c4bd02ee2ce0e14d81ab6c5
Parents: a701362
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 14 14:02:45 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jun 15 15:59:35 2016 -0500
----------------------------------------------------------------------
query_execution/PolicyEnforcer.cpp | 23 ++++-
query_execution/QueryExecutionMessages.proto | 29 +++++--
query_execution/QueryManager.cpp | 4 +-
query_execution/Worker.cpp | 89 ++++++++++++--------
query_execution/Worker.hpp | 38 +++++++--
query_execution/tests/QueryManager_unittest.cpp | 9 +-
6 files changed, 132 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 4501026..9f0502d 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -67,13 +67,26 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
// TaggedMessage only once.
std::size_t query_id;
switch (tagged_message.message_type()) {
- case kWorkOrderCompleteMessage: // Fall through.
+ case kWorkOrderCompleteMessage: {
+ serialization::NormalWorkOrderCompletionMessage proto;
+ // Note: This proto message contains the time it took to execute the
+ // WorkOrder. It can be accessed in this scope.
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ worker_directory_->decrementNumQueuedWorkOrders(
+ proto.worker_thread_index());
+ break;
+ }
case kRebuildWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
+ serialization::RebuildWorkOrderCompletionMessage proto;
+ // Note: This proto message contains the time it took to execute the
+ // rebuild WorkOrder. It can be accessed in this scope.
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
- worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index());
+ worker_directory_->decrementNumQueuedWorkOrders(
+ proto.worker_thread_index());
break;
}
case kCatalogRelationNewBlockMessage: {
@@ -98,7 +111,9 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
break;
}
case kWorkOrderFeedbackMessage: {
- WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), tagged_message.message_bytes());
+ WorkOrder::FeedbackMessage msg(
+ const_cast<void *>(tagged_message.message()),
+ tagged_message.message_bytes());
query_id = msg.header().query_id;
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 9d9a9e5..65a8946 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -20,14 +20,33 @@ package quickstep.serialization;
message EmptyMessage {
}
-// Used for both Normal WorkOrders and RebuildWorkOrders.
-// NOTE(zuyu): we might need to seperate the completion messages to contain
-// run-time information for Foreman to make better decisions on scheduling
-// WorkOrders.
-message WorkOrderCompletionMessage {
+// Note: There are different types of completion messages for normal work orders
+// rebuild work orders. This can be potentially helpful when we want to collect
+// different statistics for executing different types of work orders.
+// e.g. In select normal work order completion message, we could be interested
+// in the selectivity of the block whose work order got execute. In rebuild work
+// order completion message, we may be interested in adding the compression
+// ratio or dictionary size of the rebuilt block.
+
+// TODO(harshad) : If there are different fields in the two message types below,
+// create a base message class called WorkOrderCompletionMessage and make the
+// two classes below extend the base class. All the common fields in both the
+// classes can be moved to the base class.
+
+// A message sent upon completion of a normal (not rebuild) WorkOrder execution.
+message NormalWorkOrderCompletionMessage {
+ required uint64 operator_index = 1;
+ required uint64 worker_thread_index = 2;
+ required uint64 query_id = 3;
+ optional uint64 execution_time_in_microseconds = 4;
+}
+
+// A message sent upon completion of a rebuild WorkOrder execution.
+message RebuildWorkOrderCompletionMessage {
required uint64 operator_index = 1;
required uint64 worker_thread_index = 2;
required uint64 query_id = 3;
+ optional uint64 execution_time_in_microseconds = 4;
}
message CatalogRelationNewBlockMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/QueryManager.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.cpp b/query_execution/QueryManager.cpp
index e4e4c9d..d20b592 100644
--- a/query_execution/QueryManager.cpp
+++ b/query_execution/QueryManager.cpp
@@ -161,7 +161,7 @@ QueryManager::QueryStatusCode QueryManager::processMessage(
dag_node_index op_index;
switch (tagged_message.message_type()) {
case kWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
+ serialization::NormalWorkOrderCompletionMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
@@ -170,7 +170,7 @@ QueryManager::QueryStatusCode QueryManager::processMessage(
break;
}
case kRebuildWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
+ serialization::RebuildWorkOrderCompletionMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index ef596e1..ae889c7 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -16,7 +16,9 @@
#include "query_execution/Worker.hpp"
+#include <chrono>
#include <cstddef>
+#include <cstdint>
#include <cstdlib>
#include <utility>
@@ -36,6 +38,7 @@
#include "tmb/tagged_message.h"
using std::size_t;
+using std::uint64_t;
using tmb::TaggedMessage;
@@ -50,22 +53,27 @@ void Worker::run() {
for (;;) {
// Receive() is a blocking call, causing this thread to sleep until next
// message is received.
- const AnnotatedMessage annotated_msg = bus_->Receive(worker_client_id_, 0, true);
+ const AnnotatedMessage annotated_msg =
+ bus_->Receive(worker_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
switch (tagged_message.message_type()) {
- case kWorkOrderMessage: // Fall through.
+ case kWorkOrderMessage: {
+ serialization::NormalWorkOrderCompletionMessage proto;
+ executeWorkOrderHelper<serialization::NormalWorkOrderCompletionMessage>(
+ tagged_message, &proto);
+ sendWorkOrderCompleteMessage<
+ serialization::NormalWorkOrderCompletionMessage>(
+ annotated_msg.sender, proto, kWorkOrderCompleteMessage);
+ break;
+ }
case kRebuildWorkOrderMessage: {
- WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
- DCHECK(message.getWorkOrder() != nullptr);
- message.getWorkOrder()->execute();
- const std::size_t query_id_for_workorder =
- message.getWorkOrder()->getQueryID();
- delete message.getWorkOrder();
-
- sendWorkOrderCompleteMessage(
- annotated_msg.sender, message.getRelationalOpIndex(),
- query_id_for_workorder,
- tagged_message.message_type() == kRebuildWorkOrderMessage);
+ serialization::RebuildWorkOrderCompletionMessage proto;
+ executeWorkOrderHelper<
+ serialization::RebuildWorkOrderCompletionMessage>(tagged_message,
+ &proto);
+ sendWorkOrderCompleteMessage<
+ serialization::RebuildWorkOrderCompletionMessage>(
+ annotated_msg.sender, proto, kRebuildWorkOrderCompleteMessage);
break;
}
case kPoisonMessage: {
@@ -77,34 +85,49 @@ void Worker::run() {
}
}
+template <typename CompletionMessageProtoT>
void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
- const size_t op_index,
- const size_t query_id,
- const bool is_rebuild_work_order) {
- serialization::WorkOrderCompletionMessage proto;
- proto.set_operator_index(op_index);
- proto.set_worker_thread_index(worker_thread_index_);
- proto.set_query_id(query_id);
-
+ const CompletionMessageProtoT &proto,
+ const message_type_id message_type) {
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+ char *proto_bytes = static_cast<char *>(std::malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- is_rebuild_work_order ? kRebuildWorkOrderCompleteMessage
- : kWorkOrderCompleteMessage);
+ TaggedMessage tagged_message(
+ static_cast<const void *>(proto_bytes), proto_length, message_type);
std::free(proto_bytes);
const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- worker_client_id_,
- receiver,
- std::move(message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not "
- "be sent from worker with TMB client ID " << worker_client_id_ << " to "
- "Foreman with TMB client ID " << receiver;
+ QueryExecutionUtil::SendTMBMessage(
+ bus_, worker_client_id_, receiver, std::move(tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from worker with TMB client ID "
+ << worker_client_id_ << " to Foreman with TMB client ID " << receiver;
+}
+
+template <typename CompletionMessageProtoT>
+void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
+ CompletionMessageProtoT *proto) {
+ std::chrono::time_point<std::chrono::steady_clock> start, end;
+ WorkerMessage worker_message(
+ *static_cast<const WorkerMessage *>(tagged_message.message()));
+ DCHECK(worker_message.getWorkOrder() != nullptr);
+ const size_t query_id_for_workorder = worker_message.getWorkOrder()->getQueryID();
+
+ // Start measuring the execution time.
+ start = std::chrono::steady_clock::now();
+ worker_message.getWorkOrder()->execute();
+ end = std::chrono::steady_clock::now();
+ delete worker_message.getWorkOrder();
+ const uint64_t execution_time_microseconds =
+ std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
+ .count();
+ // Construct the proto message.
+ proto->set_operator_index(worker_message.getRelationalOpIndex());
+ proto->set_query_id(query_id_for_workorder);
+ proto->set_worker_thread_index(worker_thread_index_);
+ proto->set_execution_time_in_microseconds(execution_time_microseconds);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index c0bafdc..71d3ec9 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -19,6 +19,7 @@
#define QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_
#include <cstddef>
+#include <cstdint>
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "threading/Thread.hpp"
@@ -27,6 +28,8 @@
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
+namespace tmb { class TaggedMessge; }
+
namespace quickstep {
/** \addtogroup QueryExecution
@@ -93,18 +96,35 @@ class Worker : public Thread {
private:
/**
- * @brief Send the response WorkOrder completion message.
+ * @brief A helper method to execute the WorkOrder and construct a
+ * completion message.
+ *
+ * @note CompletionMessageProtoT is the type of the completion message.
+ * @note Right now a single helper method works for all message types.
+ * If different message types need to collect different statistics for
+ * the WorkOrder execution, we need to create different helper methods,
+ * one for each message type.
+ *
+ * @param tagged_message The TaggedMessage which consists of the WorkOrder.
+ * @param proto The proto message to be sent.
+ **/
+ template <typename CompletionMessageProtoT>
+ void executeWorkOrderHelper(const TaggedMessage &tagged_message,
+ CompletionMessageProtoT *proto);
+
+ /**
+ * @brief A helper method to send the WorkOrder completion message.
+ *
+ * @note CompletionMessageProtoT is the type of the completion message.
*
- * @param receiver The id of the TMB client which should receive the response.
- * @param op_index The index of the operator to which the WorkOrder belongs.
- * @param query_id The ID of the query which the WorkOrder belongs to.
- * @param is_rebuild_work_order True if it is a RebuildWorkOrder. Otherwise
- * false.
+ * @param receiver The TMB client ID of the receiver.
+ * @param proto The proto message to be sent.
+ * @param message_type The ID of the type of the message being sent.
**/
+ template <typename CompletionMessageProtoT>
void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
- const std::size_t op_index,
- const std::size_t query_id,
- const bool is_rebuild_work_order);
+ const CompletionMessageProtoT &proto,
+ const message_type_id message_type);
const std::size_t worker_thread_index_;
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
index 4f98748..37e2cdd 100644
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ b/query_execution/tests/QueryManager_unittest.cpp
@@ -275,7 +275,7 @@ class QueryManagerTest : public ::testing::Test {
inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
TaggedMessage tagged_message;
- serialization::WorkOrderCompletionMessage proto;
+ serialization::NormalWorkOrderCompletionMessage proto;
proto.set_operator_index(index);
proto.set_worker_thread_index(1); // dummy worker ID.
proto.set_query_id(0); // dummy query ID.
@@ -296,8 +296,7 @@ class QueryManagerTest : public ::testing::Test {
inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
- // foreman_->processRebuildWorkOrderCompleteMessage(index, 0 /* worker id */);
- serialization::WorkOrderCompletionMessage proto;
+ serialization::RebuildWorkOrderCompletionMessage proto;
proto.set_operator_index(index);
proto.set_worker_thread_index(1); // dummy worker thread ID.
proto.set_query_id(0); // dummy query ID.
@@ -346,7 +345,6 @@ class QueryManagerTest : public ::testing::Test {
unique_ptr<QueryHandle> query_handle_;
unique_ptr<QueryManager> query_manager_;
- // unique_ptr<Foreman> foreman_;
MessageBusImpl bus_;
client_id worker_client_id_;
@@ -357,7 +355,6 @@ class QueryManagerTest : public ::testing::Test {
TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
// This test creates a DAG of a single node. No workorders are generated.
query_plan_->addRelationalOperator(new MockOperator(false, false));
- // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
const MockOperator &op = static_cast<const MockOperator &>(
query_plan_->getQueryPlanDAG().getNodePayload(0));
@@ -377,7 +374,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
// This test creates a DAG of a single node. Static workorders are generated.
const QueryPlan::DAGNodeIndex id =
query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
- // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
const MockOperator &op = static_cast<const MockOperator &>(
query_plan_->getQueryPlanDAG().getNodePayload(id));
@@ -429,7 +425,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
// scaffolding of mocking. If we use gMock, we can do much better.
const QueryPlan::DAGNodeIndex id =
query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
- // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
const MockOperator &op = static_cast<const MockOperator &>(
query_plan_->getQueryPlanDAG().getNodePayload(id));