You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:06 UTC

[14/50] incubator-quickstep git commit: Unified WorkOrderCompletionMessage.

Unified WorkOrderCompletionMessage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/178ed4b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/178ed4b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/178ed4b3

Branch: refs/heads/quickstep_partition_parser_support
Commit: 178ed4b3241d5c16736459f9e9073922a03d99b2
Parents: 7095987
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Nov 12 16:44:39 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Nov 18 13:21:38 2016 -0800

----------------------------------------------------------------------
 query_execution/PolicyEnforcerBase.cpp       |  6 ++---
 query_execution/PolicyEnforcerBase.hpp       |  4 +--
 query_execution/QueryExecutionMessages.proto | 32 ++++++++---------------
 query_execution/Worker.cpp                   | 28 +++++++++-----------
 query_execution/Worker.hpp                   | 13 +++++----
 5 files changed, 35 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 745ded6..4e8c782 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -45,7 +45,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
 
   switch (tagged_message.message_type()) {
     case kWorkOrderCompleteMessage: {
-      serialization::NormalWorkOrderCompletionMessage proto;
+      serialization::WorkOrderCompletionMessage proto;
       // Note: This proto message contains the time it took to execute the
       // WorkOrder. It can be accessed in this scope.
       CHECK(proto.ParseFromArray(tagged_message.message(),
@@ -64,7 +64,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
       break;
     }
     case kRebuildWorkOrderCompleteMessage: {
-      serialization::RebuildWorkOrderCompletionMessage proto;
+      serialization::WorkOrderCompletionMessage proto;
       // Note: This proto message contains the time it took to execute the
       // rebuild WorkOrder. It can be accessed in this scope.
       CHECK(proto.ParseFromArray(tagged_message.message(),
@@ -157,7 +157,7 @@ bool PolicyEnforcerBase::admitQueries(
 }
 
 void PolicyEnforcerBase::recordTimeForWorkOrder(
-    const serialization::NormalWorkOrderCompletionMessage &proto) {
+    const serialization::WorkOrderCompletionMessage &proto) {
   const std::size_t query_id = proto.query_id();
   std::vector<WorkOrderTimeEntry> &workorder_time_entries
       = workorder_time_recorder_[query_id];

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index ea2c06f..7009a0a 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -38,7 +38,7 @@ namespace quickstep {
 class CatalogDatabaseLite;
 class QueryHandle;
 
-namespace serialization { class NormalWorkOrderCompletionMessage; }
+namespace serialization { class WorkOrderCompletionMessage; }
 
 /** \addtogroup QueryExecution
  *  @{
@@ -165,7 +165,7 @@ class PolicyEnforcerBase {
    *        execution.
    **/
   void recordTimeForWorkOrder(
-      const serialization::NormalWorkOrderCompletionMessage &proto);
+      const serialization::WorkOrderCompletionMessage &proto);
 
   CatalogDatabaseLite *catalog_database_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 1a2cb78..165a194 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -31,31 +31,21 @@ import "relational_operators/WorkOrder.proto";
 // order completion message, we may be interested in adding the compression
 // ratio or dictionary size of the rebuilt block.
 
-// TODO(harshad) : If there are different fields in the two message types below,
-// create a base message class called WorkOrderCompletionMessage and make the
-// two classes below extend the base class. All the common fields in both the
-// classes can be moved to the base class.
+message WorkOrderCompletionMessage {
+  enum WorkOrderType {
+    NORMAL = 0;
+    REBUILD = 1;
+  }
 
-// A message sent upon completion of a normal (not rebuild) WorkOrder execution.
-message NormalWorkOrderCompletionMessage {
-  required uint64 operator_index = 1;
-  required uint64 worker_thread_index = 2;
-  required uint64 query_id = 3;
-
-  // Epoch time in microseconds.
-  optional uint64 execution_start_time = 4;
-  optional uint64 execution_end_time = 5;
-}
+  required WorkOrderType work_order_type = 1;
 
-// A message sent upon completion of a rebuild WorkOrder execution.
-message RebuildWorkOrderCompletionMessage {
-  required uint64 operator_index = 1;
-  required uint64 worker_thread_index = 2;
-  required uint64 query_id = 3;
+  required uint64 operator_index = 2;
+  required uint64 worker_thread_index = 3;
+  required uint64 query_id = 4;
 
   // Epoch time in microseconds.
-  optional uint64 execution_start_time = 4;
-  optional uint64 execution_end_time = 5;
+  optional uint64 execution_start_time = 5;
+  optional uint64 execution_end_time = 6;
 }
 
 message CatalogRelationNewBlockMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 0b1efba..0db17b4 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -47,6 +47,8 @@ using tmb::TaggedMessage;
 
 namespace quickstep {
 
+using serialization::WorkOrderCompletionMessage;
+
 void Worker::run() {
   if (cpu_id_ >= 0) {
     ThreadUtil::BindToCPU(cpu_id_);
@@ -61,21 +63,16 @@ void Worker::run() {
     const TaggedMessage &tagged_message = annotated_msg.tagged_message;
     switch (tagged_message.message_type()) {
       case kWorkOrderMessage: {
-        serialization::NormalWorkOrderCompletionMessage proto;
-        executeWorkOrderHelper<serialization::NormalWorkOrderCompletionMessage>(
-            tagged_message, &proto);
-        sendWorkOrderCompleteMessage<
-            serialization::NormalWorkOrderCompletionMessage>(
+        WorkOrderCompletionMessage proto;
+        executeWorkOrderHelper(tagged_message, &proto);
+        sendWorkOrderCompleteMessage(
             annotated_msg.sender, proto, kWorkOrderCompleteMessage);
         break;
       }
       case kRebuildWorkOrderMessage: {
-        serialization::RebuildWorkOrderCompletionMessage proto;
-        executeWorkOrderHelper<
-            serialization::RebuildWorkOrderCompletionMessage>(tagged_message,
-                                                              &proto);
-        sendWorkOrderCompleteMessage<
-            serialization::RebuildWorkOrderCompletionMessage>(
+        WorkOrderCompletionMessage proto;
+        executeWorkOrderHelper(tagged_message, &proto, true /* is_rebuild */);
+        sendWorkOrderCompleteMessage(
             annotated_msg.sender, proto, kRebuildWorkOrderCompleteMessage);
         break;
       }
@@ -88,9 +85,8 @@ void Worker::run() {
   }
 }
 
-template <typename CompletionMessageProtoT>
 void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
-                                          const CompletionMessageProtoT &proto,
+                                          const WorkOrderCompletionMessage &proto,
                                           const message_type_id message_type) {
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const size_t proto_length = proto.ByteSize();
@@ -109,9 +105,9 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
   CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
-template <typename CompletionMessageProtoT>
 void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
-                                    CompletionMessageProtoT *proto) {
+                                    WorkOrderCompletionMessage *proto,
+                                    const bool is_rebuild_work_order) {
   std::chrono::time_point<std::chrono::steady_clock> start, end;
   WorkerMessage worker_message(
       *static_cast<const WorkerMessage *>(tagged_message.message()));
@@ -133,6 +129,8 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
           end.time_since_epoch()).count();
 
   // Construct the proto message.
+  proto->set_work_order_type(is_rebuild_work_order ? WorkOrderCompletionMessage::REBUILD
+                                                   : WorkOrderCompletionMessage::NORMAL);
   proto->set_operator_index(worker_message.getRelationalOpIndex());
   proto->set_query_id(query_id_for_workorder);
   proto->set_worker_thread_index(worker_thread_index_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index aa39bb3..fec2242 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -34,6 +34,8 @@ namespace tmb { class TaggedMessge; }
 
 namespace quickstep {
 
+namespace serialization { class WorkOrderCompletionMessage; }
+
 /** \addtogroup QueryExecution
  *  @{
  */
@@ -100,7 +102,6 @@ class Worker : public Thread {
    * @brief A helper method to execute the WorkOrder and construct a
    *        completion message.
    *
-   * @note CompletionMessageProtoT is the type of the completion message.
    * @note Right now a single helper method works for all message types.
    *       If different message types need to collect different statistics for
    *       the WorkOrder execution, we need to create different helper methods,
@@ -108,23 +109,21 @@ class Worker : public Thread {
    *
    * @param tagged_message The TaggedMessage which consists of the WorkOrder.
    * @param proto The proto message to be sent.
+   * @param is_rebuild_work_order Whether it is used for a RebuildWorkOrder.
    **/
-  template <typename CompletionMessageProtoT>
   void executeWorkOrderHelper(const TaggedMessage &tagged_message,
-                              CompletionMessageProtoT *proto);
+                              serialization::WorkOrderCompletionMessage *proto,
+                              const bool is_rebuild_work_order = false);
 
   /**
    * @brief A helper method to send the WorkOrder completion message.
    *
-   * @note CompletionMessageProtoT is the type of the completion message.
-   *
    * @param receiver The TMB client ID of the receiver.
    * @param proto The proto message to be sent.
    * @param message_type The ID of the type of the message being sent.
    **/
-  template <typename CompletionMessageProtoT>
   void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
-                                    const CompletionMessageProtoT &proto,
+                                    const serialization::WorkOrderCompletionMessage &proto,
                                     const message_type_id message_type);
 
   const std::size_t worker_thread_index_;