You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/06/27 21:33:37 UTC

[02/18] incubator-quickstep git commit: Measure execution time of WorkOrders.

Measure execution time of WorkOrders.

- Measure order execution time for both normal and rebuild work
  orders.
- Split the WorkOrderCompletion proto message in two: Normal and Rebuild
  work orders, with a possibility that in the future the two messages
  will have different fields.
- Add execution time field in both WorkOrderComplete proto messages.
- Include the recorded time in both WorkOrderComplete proto messages
  that are sent back to Foreman.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8e825f15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8e825f15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8e825f15

Branch: refs/heads/travis-grpc
Commit: 8e825f1525d5080e6c4bd02ee2ce0e14d81ab6c5
Parents: a701362
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 14 14:02:45 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jun 15 15:59:35 2016 -0500

----------------------------------------------------------------------
 query_execution/PolicyEnforcer.cpp              | 23 ++++-
 query_execution/QueryExecutionMessages.proto    | 29 +++++--
 query_execution/QueryManager.cpp                |  4 +-
 query_execution/Worker.cpp                      | 89 ++++++++++++--------
 query_execution/Worker.hpp                      | 38 +++++++--
 query_execution/tests/QueryManager_unittest.cpp |  9 +-
 6 files changed, 132 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 4501026..9f0502d 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -67,13 +67,26 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
   // TaggedMessage only once.
   std::size_t query_id;
   switch (tagged_message.message_type()) {
-    case kWorkOrderCompleteMessage:  // Fall through.
+    case kWorkOrderCompleteMessage: {
+      serialization::NormalWorkOrderCompletionMessage proto;
+      // Note: This proto message contains the time it took to execute the
+      // WorkOrder. It can be accessed in this scope.
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      worker_directory_->decrementNumQueuedWorkOrders(
+          proto.worker_thread_index());
+      break;
+    }
     case kRebuildWorkOrderCompleteMessage: {
-      serialization::WorkOrderCompletionMessage proto;
+      serialization::RebuildWorkOrderCompletionMessage proto;
+      // Note: This proto message contains the time it took to execute the
+      // rebuild WorkOrder. It can be accessed in this scope.
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();
-      worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index());
+      worker_directory_->decrementNumQueuedWorkOrders(
+          proto.worker_thread_index());
       break;
     }
     case kCatalogRelationNewBlockMessage: {
@@ -98,7 +111,9 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
       break;
     }
     case kWorkOrderFeedbackMessage: {
-      WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), tagged_message.message_bytes());
+      WorkOrder::FeedbackMessage msg(
+          const_cast<void *>(tagged_message.message()),
+          tagged_message.message_bytes());
       query_id = msg.header().query_id;
       break;
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 9d9a9e5..65a8946 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -20,14 +20,33 @@ package quickstep.serialization;
 message EmptyMessage {
 }
 
-// Used for both Normal WorkOrders and RebuildWorkOrders.
-// NOTE(zuyu): we might need to seperate the completion messages to contain
-// run-time information for Foreman to make better decisions on scheduling
-// WorkOrders.
-message WorkOrderCompletionMessage {
+// Note: There are different types of completion messages for normal work orders
+// rebuild work orders. This can be potentially helpful when we want to collect
+// different statistics for executing different types of work orders. 
+// e.g. In select normal work order completion message, we could be interested 
+// in the selectivity of the block whose work order got execute. In rebuild work 
+// order completion message, we may be interested in adding the compression 
+// ratio or dictionary size of the rebuilt block.
+
+// TODO(harshad) : If there are different fields in the two message types below, 
+// create a base message class called WorkOrderCompletionMessage and make the
+// two classes below extend the base class. All the common fields in both the
+// classes can be moved to the base class. 
+
+// A message sent upon completion of a normal (not rebuild) WorkOrder execution.
+message NormalWorkOrderCompletionMessage {
+  required uint64 operator_index = 1;
+  required uint64 worker_thread_index = 2;
+  required uint64 query_id = 3;
+  optional uint64 execution_time_in_microseconds = 4;
+}
+
+// A message sent upon completion of a rebuild WorkOrder execution.
+message RebuildWorkOrderCompletionMessage {
   required uint64 operator_index = 1;
   required uint64 worker_thread_index = 2;
   required uint64 query_id = 3;
+  optional uint64 execution_time_in_microseconds = 4;
 }
 
 message CatalogRelationNewBlockMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/QueryManager.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.cpp b/query_execution/QueryManager.cpp
index e4e4c9d..d20b592 100644
--- a/query_execution/QueryManager.cpp
+++ b/query_execution/QueryManager.cpp
@@ -161,7 +161,7 @@ QueryManager::QueryStatusCode QueryManager::processMessage(
   dag_node_index op_index;
   switch (tagged_message.message_type()) {
     case kWorkOrderCompleteMessage: {
-      serialization::WorkOrderCompletionMessage proto;
+      serialization::NormalWorkOrderCompletionMessage proto;
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
 
@@ -170,7 +170,7 @@ QueryManager::QueryStatusCode QueryManager::processMessage(
       break;
     }
     case kRebuildWorkOrderCompleteMessage: {
-      serialization::WorkOrderCompletionMessage proto;
+      serialization::RebuildWorkOrderCompletionMessage proto;
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index ef596e1..ae889c7 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -16,7 +16,9 @@
 
 #include "query_execution/Worker.hpp"
 
+#include <chrono>
 #include <cstddef>
+#include <cstdint>
 #include <cstdlib>
 #include <utility>
 
@@ -36,6 +38,7 @@
 #include "tmb/tagged_message.h"
 
 using std::size_t;
+using std::uint64_t;
 
 using tmb::TaggedMessage;
 
@@ -50,22 +53,27 @@ void Worker::run() {
   for (;;) {
     // Receive() is a blocking call, causing this thread to sleep until next
     // message is received.
-    const AnnotatedMessage annotated_msg = bus_->Receive(worker_client_id_, 0, true);
+    const AnnotatedMessage annotated_msg =
+        bus_->Receive(worker_client_id_, 0, true);
     const TaggedMessage &tagged_message = annotated_msg.tagged_message;
     switch (tagged_message.message_type()) {
-      case kWorkOrderMessage:  // Fall through.
+      case kWorkOrderMessage: {
+        serialization::NormalWorkOrderCompletionMessage proto;
+        executeWorkOrderHelper<serialization::NormalWorkOrderCompletionMessage>(
+            tagged_message, &proto);
+        sendWorkOrderCompleteMessage<
+            serialization::NormalWorkOrderCompletionMessage>(
+            annotated_msg.sender, proto, kWorkOrderCompleteMessage);
+        break;
+      }
       case kRebuildWorkOrderMessage: {
-        WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
-        DCHECK(message.getWorkOrder() != nullptr);
-        message.getWorkOrder()->execute();
-        const std::size_t query_id_for_workorder =
-            message.getWorkOrder()->getQueryID();
-        delete message.getWorkOrder();
-
-        sendWorkOrderCompleteMessage(
-            annotated_msg.sender, message.getRelationalOpIndex(),
-            query_id_for_workorder,
-            tagged_message.message_type() == kRebuildWorkOrderMessage);
+        serialization::RebuildWorkOrderCompletionMessage proto;
+        executeWorkOrderHelper<
+            serialization::RebuildWorkOrderCompletionMessage>(tagged_message,
+                                                              &proto);
+        sendWorkOrderCompleteMessage<
+            serialization::RebuildWorkOrderCompletionMessage>(
+            annotated_msg.sender, proto, kRebuildWorkOrderCompleteMessage);
         break;
       }
       case kPoisonMessage: {
@@ -77,34 +85,49 @@ void Worker::run() {
   }
 }
 
+template <typename CompletionMessageProtoT>
 void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
-                                          const size_t op_index,
-                                          const size_t query_id,
-                                          const bool is_rebuild_work_order) {
-  serialization::WorkOrderCompletionMessage proto;
-  proto.set_operator_index(op_index);
-  proto.set_worker_thread_index(worker_thread_index_);
-  proto.set_query_id(query_id);
-
+                                          const CompletionMessageProtoT &proto,
+                                          const message_type_id message_type) {
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+  char *proto_bytes = static_cast<char *>(std::malloc(proto_length));
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        is_rebuild_work_order ? kRebuildWorkOrderCompleteMessage
-                                              : kWorkOrderCompleteMessage);
+  TaggedMessage tagged_message(
+      static_cast<const void *>(proto_bytes), proto_length, message_type);
   std::free(proto_bytes);
 
   const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         worker_client_id_,
-                                         receiver,
-                                         std::move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not "
-      "be sent from worker with TMB client ID " << worker_client_id_ << " to "
-      "Foreman with TMB client ID " << receiver;
+      QueryExecutionUtil::SendTMBMessage(
+          bus_, worker_client_id_, receiver, std::move(tagged_message));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from worker with TMB client ID "
+      << worker_client_id_ << " to Foreman with TMB client ID " << receiver;
+}
+
+template <typename CompletionMessageProtoT>
+void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
+                                    CompletionMessageProtoT *proto) {
+  std::chrono::time_point<std::chrono::steady_clock> start, end;
+  WorkerMessage worker_message(
+      *static_cast<const WorkerMessage *>(tagged_message.message()));
+  DCHECK(worker_message.getWorkOrder() != nullptr);
+  const size_t query_id_for_workorder = worker_message.getWorkOrder()->getQueryID();
+
+  // Start measuring the execution time.
+  start = std::chrono::steady_clock::now();
+  worker_message.getWorkOrder()->execute();
+  end = std::chrono::steady_clock::now();
+  delete worker_message.getWorkOrder();
+  const uint64_t execution_time_microseconds =
+      std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
+          .count();
+  // Construct the proto message.
+  proto->set_operator_index(worker_message.getRelationalOpIndex());
+  proto->set_query_id(query_id_for_workorder);
+  proto->set_worker_thread_index(worker_thread_index_);
+  proto->set_execution_time_in_microseconds(execution_time_microseconds);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index c0bafdc..71d3ec9 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -19,6 +19,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_
 
 #include <cstddef>
+#include <cstdint>
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "threading/Thread.hpp"
@@ -27,6 +28,8 @@
 #include "tmb/id_typedefs.h"
 #include "tmb/message_bus.h"
 
+namespace tmb { class TaggedMessge; }
+
 namespace quickstep {
 
 /** \addtogroup QueryExecution
@@ -93,18 +96,35 @@ class Worker : public Thread {
 
  private:
   /**
-   * @brief Send the response WorkOrder completion message.
+   * @brief A helper method to execute the WorkOrder and construct a
+   *        completion message.
+   *
+   * @note CompletionMessageProtoT is the type of the completion message.
+   * @note Right now a single helper method works for all message types.
+   *       If different message types need to collect different statistics for
+   *       the WorkOrder execution, we need to create different helper methods,
+   *       one for each message type.
+   *
+   * @param tagged_message The TaggedMessage which consists of the WorkOrder.
+   * @param proto The proto message to be sent.
+   **/
+  template <typename CompletionMessageProtoT>
+  void executeWorkOrderHelper(const TaggedMessage &tagged_message,
+                              CompletionMessageProtoT *proto);
+
+  /**
+   * @brief A helper method to send the WorkOrder completion message.
+   *
+   * @note CompletionMessageProtoT is the type of the completion message.
    *
-   * @param receiver The id of the TMB client which should receive the response.
-   * @param op_index The index of the operator to which the WorkOrder belongs.
-   * @param query_id The ID of the query which the WorkOrder belongs to.
-   * @param is_rebuild_work_order True if it is a RebuildWorkOrder. Otherwise
-   *        false.
+   * @param receiver The TMB client ID of the receiver.
+   * @param proto The proto message to be sent.
+   * @param message_type The ID of the type of the message being sent.
    **/
+  template <typename CompletionMessageProtoT>
   void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
-                                    const std::size_t op_index,
-                                    const std::size_t query_id,
-                                    const bool is_rebuild_work_order);
+                                    const CompletionMessageProtoT &proto,
+                                    const message_type_id message_type);
 
   const std::size_t worker_thread_index_;
   MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e825f15/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
index 4f98748..37e2cdd 100644
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ b/query_execution/tests/QueryManager_unittest.cpp
@@ -275,7 +275,7 @@ class QueryManagerTest : public ::testing::Test {
   inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
     VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
     TaggedMessage tagged_message;
-    serialization::WorkOrderCompletionMessage proto;
+    serialization::NormalWorkOrderCompletionMessage proto;
     proto.set_operator_index(index);
     proto.set_worker_thread_index(1);  // dummy worker ID.
     proto.set_query_id(0);  // dummy query ID.
@@ -296,8 +296,7 @@ class QueryManagerTest : public ::testing::Test {
 
   inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
     VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
-    // foreman_->processRebuildWorkOrderCompleteMessage(index, 0 /* worker id */);
-    serialization::WorkOrderCompletionMessage proto;
+    serialization::RebuildWorkOrderCompletionMessage proto;
     proto.set_operator_index(index);
     proto.set_worker_thread_index(1);  // dummy worker thread ID.
     proto.set_query_id(0);  // dummy query ID.
@@ -346,7 +345,6 @@ class QueryManagerTest : public ::testing::Test {
   unique_ptr<QueryHandle> query_handle_;
   unique_ptr<QueryManager> query_manager_;
 
-  // unique_ptr<Foreman> foreman_;
   MessageBusImpl bus_;
 
   client_id worker_client_id_;
@@ -357,7 +355,6 @@ class QueryManagerTest : public ::testing::Test {
 TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
   // This test creates a DAG of a single node. No workorders are generated.
   query_plan_->addRelationalOperator(new MockOperator(false, false));
-  // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
 
   const MockOperator &op = static_cast<const MockOperator &>(
       query_plan_->getQueryPlanDAG().getNodePayload(0));
@@ -377,7 +374,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
   // This test creates a DAG of a single node. Static workorders are generated.
   const QueryPlan::DAGNodeIndex id =
       query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
 
   const MockOperator &op = static_cast<const MockOperator &>(
       query_plan_->getQueryPlanDAG().getNodePayload(id));
@@ -429,7 +425,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
   // scaffolding of mocking. If we use gMock, we can do much better.
   const QueryPlan::DAGNodeIndex id =
       query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
-  // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
 
   const MockOperator &op = static_cast<const MockOperator &>(
       query_plan_->getQueryPlanDAG().getNodePayload(id));