You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:06 UTC
[14/50] incubator-quickstep git commit: Unified
WorkOrderCompletionMessage.
Unified WorkOrderCompletionMessage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/178ed4b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/178ed4b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/178ed4b3
Branch: refs/heads/quickstep_partition_parser_support
Commit: 178ed4b3241d5c16736459f9e9073922a03d99b2
Parents: 7095987
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Nov 12 16:44:39 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Nov 18 13:21:38 2016 -0800
----------------------------------------------------------------------
query_execution/PolicyEnforcerBase.cpp | 6 ++---
query_execution/PolicyEnforcerBase.hpp | 4 +--
query_execution/QueryExecutionMessages.proto | 32 ++++++++---------------
query_execution/Worker.cpp | 28 +++++++++-----------
query_execution/Worker.hpp | 13 +++++----
5 files changed, 35 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 745ded6..4e8c782 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -45,7 +45,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
switch (tagged_message.message_type()) {
case kWorkOrderCompleteMessage: {
- serialization::NormalWorkOrderCompletionMessage proto;
+ serialization::WorkOrderCompletionMessage proto;
// Note: This proto message contains the time it took to execute the
// WorkOrder. It can be accessed in this scope.
CHECK(proto.ParseFromArray(tagged_message.message(),
@@ -64,7 +64,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
break;
}
case kRebuildWorkOrderCompleteMessage: {
- serialization::RebuildWorkOrderCompletionMessage proto;
+ serialization::WorkOrderCompletionMessage proto;
// Note: This proto message contains the time it took to execute the
// rebuild WorkOrder. It can be accessed in this scope.
CHECK(proto.ParseFromArray(tagged_message.message(),
@@ -157,7 +157,7 @@ bool PolicyEnforcerBase::admitQueries(
}
void PolicyEnforcerBase::recordTimeForWorkOrder(
- const serialization::NormalWorkOrderCompletionMessage &proto) {
+ const serialization::WorkOrderCompletionMessage &proto) {
const std::size_t query_id = proto.query_id();
std::vector<WorkOrderTimeEntry> &workorder_time_entries
= workorder_time_recorder_[query_id];
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index ea2c06f..7009a0a 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -38,7 +38,7 @@ namespace quickstep {
class CatalogDatabaseLite;
class QueryHandle;
-namespace serialization { class NormalWorkOrderCompletionMessage; }
+namespace serialization { class WorkOrderCompletionMessage; }
/** \addtogroup QueryExecution
* @{
@@ -165,7 +165,7 @@ class PolicyEnforcerBase {
* execution.
**/
void recordTimeForWorkOrder(
- const serialization::NormalWorkOrderCompletionMessage &proto);
+ const serialization::WorkOrderCompletionMessage &proto);
CatalogDatabaseLite *catalog_database_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 1a2cb78..165a194 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -31,31 +31,21 @@ import "relational_operators/WorkOrder.proto";
// order completion message, we may be interested in adding the compression
// ratio or dictionary size of the rebuilt block.
-// TODO(harshad) : If there are different fields in the two message types below,
-// create a base message class called WorkOrderCompletionMessage and make the
-// two classes below extend the base class. All the common fields in both the
-// classes can be moved to the base class.
+message WorkOrderCompletionMessage {
+ enum WorkOrderType {
+ NORMAL = 0;
+ REBUILD = 1;
+ }
-// A message sent upon completion of a normal (not rebuild) WorkOrder execution.
-message NormalWorkOrderCompletionMessage {
- required uint64 operator_index = 1;
- required uint64 worker_thread_index = 2;
- required uint64 query_id = 3;
-
- // Epoch time in microseconds.
- optional uint64 execution_start_time = 4;
- optional uint64 execution_end_time = 5;
-}
+ required WorkOrderType work_order_type = 1;
-// A message sent upon completion of a rebuild WorkOrder execution.
-message RebuildWorkOrderCompletionMessage {
- required uint64 operator_index = 1;
- required uint64 worker_thread_index = 2;
- required uint64 query_id = 3;
+ required uint64 operator_index = 2;
+ required uint64 worker_thread_index = 3;
+ required uint64 query_id = 4;
// Epoch time in microseconds.
- optional uint64 execution_start_time = 4;
- optional uint64 execution_end_time = 5;
+ optional uint64 execution_start_time = 5;
+ optional uint64 execution_end_time = 6;
}
message CatalogRelationNewBlockMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 0b1efba..0db17b4 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -47,6 +47,8 @@ using tmb::TaggedMessage;
namespace quickstep {
+using serialization::WorkOrderCompletionMessage;
+
void Worker::run() {
if (cpu_id_ >= 0) {
ThreadUtil::BindToCPU(cpu_id_);
@@ -61,21 +63,16 @@ void Worker::run() {
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
switch (tagged_message.message_type()) {
case kWorkOrderMessage: {
- serialization::NormalWorkOrderCompletionMessage proto;
- executeWorkOrderHelper<serialization::NormalWorkOrderCompletionMessage>(
- tagged_message, &proto);
- sendWorkOrderCompleteMessage<
- serialization::NormalWorkOrderCompletionMessage>(
+ WorkOrderCompletionMessage proto;
+ executeWorkOrderHelper(tagged_message, &proto);
+ sendWorkOrderCompleteMessage(
annotated_msg.sender, proto, kWorkOrderCompleteMessage);
break;
}
case kRebuildWorkOrderMessage: {
- serialization::RebuildWorkOrderCompletionMessage proto;
- executeWorkOrderHelper<
- serialization::RebuildWorkOrderCompletionMessage>(tagged_message,
- &proto);
- sendWorkOrderCompleteMessage<
- serialization::RebuildWorkOrderCompletionMessage>(
+ WorkOrderCompletionMessage proto;
+ executeWorkOrderHelper(tagged_message, &proto, true /* is_rebuild */);
+ sendWorkOrderCompleteMessage(
annotated_msg.sender, proto, kRebuildWorkOrderCompleteMessage);
break;
}
@@ -88,9 +85,8 @@ void Worker::run() {
}
}
-template <typename CompletionMessageProtoT>
void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
- const CompletionMessageProtoT &proto,
+ const WorkOrderCompletionMessage &proto,
const message_type_id message_type) {
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t proto_length = proto.ByteSize();
@@ -109,9 +105,9 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
-template <typename CompletionMessageProtoT>
void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
- CompletionMessageProtoT *proto) {
+ WorkOrderCompletionMessage *proto,
+ const bool is_rebuild_work_order) {
std::chrono::time_point<std::chrono::steady_clock> start, end;
WorkerMessage worker_message(
*static_cast<const WorkerMessage *>(tagged_message.message()));
@@ -133,6 +129,8 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
end.time_since_epoch()).count();
// Construct the proto message.
+ proto->set_work_order_type(is_rebuild_work_order ? WorkOrderCompletionMessage::REBUILD
+ : WorkOrderCompletionMessage::NORMAL);
proto->set_operator_index(worker_message.getRelationalOpIndex());
proto->set_query_id(query_id_for_workorder);
proto->set_worker_thread_index(worker_thread_index_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index aa39bb3..fec2242 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -34,6 +34,8 @@ namespace tmb { class TaggedMessge; }
namespace quickstep {
+namespace serialization { class WorkOrderCompletionMessage; }
+
/** \addtogroup QueryExecution
* @{
*/
@@ -100,7 +102,6 @@ class Worker : public Thread {
* @brief A helper method to execute the WorkOrder and construct a
* completion message.
*
- * @note CompletionMessageProtoT is the type of the completion message.
* @note Right now a single helper method works for all message types.
* If different message types need to collect different statistics for
* the WorkOrder execution, we need to create different helper methods,
@@ -108,23 +109,21 @@ class Worker : public Thread {
*
* @param tagged_message The TaggedMessage which consists of the WorkOrder.
* @param proto The proto message to be sent.
+ * @param is_rebuild_work_order Whether it is used for a RebuildWorkOrder.
**/
- template <typename CompletionMessageProtoT>
void executeWorkOrderHelper(const TaggedMessage &tagged_message,
- CompletionMessageProtoT *proto);
+ serialization::WorkOrderCompletionMessage *proto,
+ const bool is_rebuild_work_order = false);
/**
* @brief A helper method to send the WorkOrder completion message.
*
- * @note CompletionMessageProtoT is the type of the completion message.
- *
* @param receiver The TMB client ID of the receiver.
* @param proto The proto message to be sent.
* @param message_type The ID of the type of the message being sent.
**/
- template <typename CompletionMessageProtoT>
void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
- const CompletionMessageProtoT &proto,
+ const serialization::WorkOrderCompletionMessage &proto,
const message_type_id message_type);
const std::size_t worker_thread_index_;