You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:04 UTC

[12/50] incubator-quickstep git commit: Refactor Shiftboss for better debug info.

Refactor Shiftboss for better debug info.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/365fff6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/365fff6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/365fff6a

Branch: refs/heads/quickstep_partition_parser_support
Commit: 365fff6a3371f6516488a808cc79e929ff789b4a
Parents: 9a005f3
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Nov 17 15:02:17 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Nov 17 15:02:17 2016 -0800

----------------------------------------------------------------------
 query_execution/Shiftboss.cpp | 56 ++++++++++++++++++++------------------
 query_execution/Shiftboss.hpp |  2 ++
 2 files changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/365fff6a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index a434527..09d7846 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -56,6 +56,7 @@ using std::string;
 using std::unique_ptr;
 using std::vector;
 
+using tmb::AnnotatedMessage;
 using tmb::MessageBus;
 using tmb::TaggedMessage;
 
@@ -69,25 +70,16 @@ void Shiftboss::run() {
     ThreadUtil::BindToCPU(cpu_id_);
   }
 
+  processShiftbossRegistrationResponseMessage();
+
   for (;;) {
     // Receive() is a blocking call, causing this thread to sleep until next
     // message is received.
     AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
-    DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
                << "') received the typed '" << annotated_message.tagged_message.message_type()
                << "' message from client " << annotated_message.sender;
     switch (annotated_message.tagged_message.message_type()) {
-      case kShiftbossRegistrationResponseMessage: {
-        foreman_client_id_ = annotated_message.sender;
-
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::ShiftbossRegistrationResponseMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        shiftboss_index_ = proto.shiftboss_index();
-        break;
-      }
       case kQueryInitiateMessage: {
         const TaggedMessage &tagged_message = annotated_message.tagged_message;
 
@@ -121,7 +113,7 @@ void Shiftboss::run() {
                                             kWorkOrderMessage);
 
         const size_t worker_index = getSchedulableWorker();
-        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
                    << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
                    << "') from Foreman to worker " << worker_index;
 
@@ -152,7 +144,7 @@ void Shiftboss::run() {
       case kWorkOrderFeedbackMessage:
       case kWorkOrderCompleteMessage:
       case kRebuildWorkOrderCompleteMessage: {
-        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
                    << "') forwarded typed '" << annotated_message.tagged_message.message_type()
                    << "' message from Worker with TMB client ID '" << annotated_message.sender
                    << "' to Foreman with TMB client ID " << foreman_client_id_;
@@ -203,7 +195,7 @@ void Shiftboss::run() {
                                        kSaveQueryResultResponseMessage);
         free(proto_response_bytes);
 
-        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
                    << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
                    << "') to Foreman with TMB client ID " << foreman_client_id_;
         const MessageBus::SendStatus send_status =
@@ -215,7 +207,7 @@ void Shiftboss::run() {
         break;
       }
       case kPoisonMessage: {
-        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
                    << "') forwarded PoisonMessage (typed '" << kPoisonMessage
                    << "') from Foreman to all workers";
 
@@ -271,7 +263,7 @@ void Shiftboss::registerWithForeman() {
                         kShiftbossRegistrationMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
              << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
              << "') to all";
   tmb::MessageBus::SendStatus send_status =
@@ -279,18 +271,30 @@ void Shiftboss::registerWithForeman() {
   DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
+void Shiftboss::processShiftbossRegistrationResponseMessage() {
+  const AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+  const TaggedMessage &tagged_message = annotated_message.tagged_message;
+  DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
+
+  foreman_client_id_ = annotated_message.sender;
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+             << "') received the typed '" << kShiftbossRegistrationResponseMessage
+             << "' message from ForemanDistributed with client " << foreman_client_id_;
+
+  serialization::ShiftbossRegistrationResponseMessage proto;
+  CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+  shiftboss_index_ = proto.shiftboss_index();
+}
+
 void Shiftboss::processQueryInitiateMessage(
     const std::size_t query_id,
     const serialization::CatalogDatabase &catalog_database_cache_proto,
     const serialization::QueryContext &query_context_proto) {
   database_cache_.update(catalog_database_cache_proto);
 
-  unique_ptr<QueryContext> query_context(
-      new QueryContext(query_context_proto,
-                       database_cache_,
-                       storage_manager_,
-                       shiftboss_client_id_,
-                       bus_));
+  auto query_context = std::make_unique<QueryContext>(
+      query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_);
   query_contexts_.emplace(query_id, move(query_context));
 
   serialization::QueryInitiateResponseMessage proto;
@@ -305,7 +309,7 @@ void Shiftboss::processQueryInitiateMessage(
                                  kQueryInitiateResponseMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
              << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
              << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
@@ -344,7 +348,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                  kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
              << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
              << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
@@ -373,7 +377,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                         kRebuildWorkOrderMessage);
 
     const size_t worker_index = getSchedulableWorker();
-    DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
                << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
                << "') to worker " << worker_index;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/365fff6a/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 94b10a2..442e61e 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -191,6 +191,8 @@ class Shiftboss : public Thread {
  private:
   void registerWithForeman();
 
+  void processShiftbossRegistrationResponseMessage();
+
   /**
    * @brief Process the Shiftboss initiate message and ack back.
    *