You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/08 17:34:00 UTC

incubator-quickstep git commit: Minor updates to Shiftboss.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master bd017482e -> e443b2b84


Minor updates to Shiftboss.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e443b2b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e443b2b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e443b2b8

Branch: refs/heads/master
Commit: e443b2b8409a128cc5ba2bdf1a6d01ebf79e7e74
Parents: bd01748
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 10:33:10 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 8 10:33:10 2016 -0700

----------------------------------------------------------------------
 query_execution/QueryExecutionMessages.proto |  1 +
 query_execution/Shiftboss.cpp                | 89 ++++++++++++++++-------
 query_execution/Shiftboss.hpp                |  3 +
 3 files changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e443b2b8/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index f6a8b73..f680d35 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -87,6 +87,7 @@ message ShiftbossRegistrationMessage {
 }
 
 message ShiftbossRegistrationResponseMessage {
+  required uint64 shiftboss_index = 1;
 }
 
 message QueryInitiateMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e443b2b8/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 120e8fb..24c91fe 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -56,6 +56,7 @@ using std::string;
 using std::unique_ptr;
 using std::vector;
 
+using tmb::MessageBus;
 using tmb::TaggedMessage;
 
 namespace quickstep {
@@ -78,6 +79,13 @@ void Shiftboss::run() {
     switch (annotated_message.tagged_message.message_type()) {
       case kShiftbossRegistrationResponseMessage: {
         foreman_client_id_ = annotated_message.sender;
+
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+        serialization::ShiftbossRegistrationResponseMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        shiftboss_index_ = proto.shiftboss_index();
         break;
       }
       case kQueryInitiateMessage: {
@@ -117,10 +125,14 @@ void Shiftboss::run() {
                   << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
                   << "') from Foreman to worker " << worker_index;
 
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
-                                           workers_->getClientID(worker_index),
-                                           move(worker_tagged_message));
+        const MessageBus::SendStatus send_status =
+            QueryExecutionUtil::SendTMBMessage(bus_,
+                                               shiftboss_client_id_,
+                                               workers_->getClientID(worker_index),
+                                               move(worker_tagged_message));
+        CHECK(send_status == MessageBus::SendStatus::kOK)
+            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+            << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
         break;
       }
       case kInitiateRebuildMessage: {
@@ -147,10 +159,14 @@ void Shiftboss::run() {
                   << "' message from worker (client " << annotated_message.sender << ") to Foreman";
 
         DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
-                                           foreman_client_id_,
-                                           move(annotated_message.tagged_message));
+        const MessageBus::SendStatus send_status =
+            QueryExecutionUtil::SendTMBMessage(bus_,
+                                               shiftboss_client_id_,
+                                               foreman_client_id_,
+                                               move(annotated_message.tagged_message));
+        CHECK(send_status == MessageBus::SendStatus::kOK)
+            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+            << " to Foreman with TMB client ID " << foreman_client_id_;
         break;
       }
       case kSaveQueryResultMessage: {
@@ -178,10 +194,14 @@ void Shiftboss::run() {
         LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
                   << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
                   << "') to Foreman";
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
-                                           foreman_client_id_,
-                                           move(message_response));
+        const MessageBus::SendStatus send_status =
+            QueryExecutionUtil::SendTMBMessage(bus_,
+                                               shiftboss_client_id_,
+                                               foreman_client_id_,
+                                               move(message_response));
+        CHECK(send_status == MessageBus::SendStatus::kOK)
+            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+            << " to Foreman with TMB client ID " << foreman_client_id_;
         break;
       }
       case kPoisonMessage: {
@@ -192,12 +212,14 @@ void Shiftboss::run() {
         tmb::MessageStyle broadcast_style;
         broadcast_style.Broadcast(true);
 
-        tmb::MessageBus::SendStatus send_status =
+        const MessageBus::SendStatus send_status =
             bus_->Send(shiftboss_client_id_,
                        worker_addresses_,
                        broadcast_style,
                        move(annotated_message.tagged_message));
-        DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+        CHECK(send_status == MessageBus::SendStatus::kOK)
+            << "Message could not be broadcast from Shiftboss with TMB client ID " << shiftboss_client_id_
+            << " to All workers";
         return;
       }
       default: {
@@ -280,10 +302,14 @@ void Shiftboss::processQueryInitiateMessage(
                                  kQueryInitiateResponseMessage);
   free(proto_bytes);
 
-  QueryExecutionUtil::SendTMBMessage(bus_,
-                                     shiftboss_client_id_,
-                                     foreman_client_id_,
-                                     move(message_response));
+  const MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         shiftboss_client_id_,
+                                         foreman_client_id_,
+                                         move(message_response));
+  CHECK(send_status == MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+      << " to Foreman with TMB client ID " << foreman_client_id_;
 }
 
 void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -307,8 +333,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
   proto.set_query_id(query_id);
   proto.set_operator_index(op_index);
   proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
-  // TODO(zuyu): Multiple Shiftboss support.
-  proto.set_shiftboss_index(0);
+  proto.set_shiftboss_index(shiftboss_index_);
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -319,10 +344,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                  kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
-  QueryExecutionUtil::SendTMBMessage(bus_,
-                                     shiftboss_client_id_,
-                                     foreman_client_id_,
-                                     move(message_response));
+  const MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         shiftboss_client_id_,
+                                         foreman_client_id_,
+                                         move(message_response));
+  CHECK(send_status == MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+      << " to Foreman with TMB client ID " << foreman_client_id_;
 
   for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
     // NOTE(zuyu): Worker releases the memory after the execution of
@@ -347,10 +376,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
               << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
               << "') to worker " << worker_index;
 
-    QueryExecutionUtil::SendTMBMessage(bus_,
-                                       shiftboss_client_id_,
-                                       workers_->getClientID(worker_index),
-                                       move(worker_tagged_message));
+    const MessageBus::SendStatus send_status =
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           shiftboss_client_id_,
+                                           workers_->getClientID(worker_index),
+                                           move(worker_tagged_message));
+    CHECK(send_status == MessageBus::SendStatus::kOK)
+        << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+        << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e443b2b8/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 32d2408..9464a4d 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -223,6 +223,9 @@ class Shiftboss : public Thread {
 
   tmb::client_id shiftboss_client_id_, foreman_client_id_;
 
+  // Unique per Shiftboss instance.
+  std::uint64_t shiftboss_index_;
+
   // TMB recipients for all workers managed by this Shiftboss.
   tmb::Address worker_addresses_;