You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/12 16:06:57 UTC
[40/50] [abbrv] incubator-quickstep git commit: Minor updates to
Shiftboss.
Minor updates to Shiftboss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e443b2b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e443b2b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e443b2b8
Branch: refs/heads/travis-grpc
Commit: e443b2b8409a128cc5ba2bdf1a6d01ebf79e7e74
Parents: bd01748
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 10:33:10 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 8 10:33:10 2016 -0700
----------------------------------------------------------------------
query_execution/QueryExecutionMessages.proto | 1 +
query_execution/Shiftboss.cpp | 89 ++++++++++++++++-------
query_execution/Shiftboss.hpp | 3 +
3 files changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e443b2b8/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index f6a8b73..f680d35 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -87,6 +87,7 @@ message ShiftbossRegistrationMessage {
}
message ShiftbossRegistrationResponseMessage {
+ required uint64 shiftboss_index = 1;
}
message QueryInitiateMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e443b2b8/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 120e8fb..24c91fe 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -56,6 +56,7 @@ using std::string;
using std::unique_ptr;
using std::vector;
+using tmb::MessageBus;
using tmb::TaggedMessage;
namespace quickstep {
@@ -78,6 +79,13 @@ void Shiftboss::run() {
switch (annotated_message.tagged_message.message_type()) {
case kShiftbossRegistrationResponseMessage: {
foreman_client_id_ = annotated_message.sender;
+
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::ShiftbossRegistrationResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ shiftboss_index_ = proto.shiftboss_index();
break;
}
case kQueryInitiateMessage: {
@@ -117,10 +125,14 @@ void Shiftboss::run() {
<< "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
<< "') from Foreman to worker " << worker_index;
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- workers_->getClientID(worker_index),
- move(worker_tagged_message));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ workers_->getClientID(worker_index),
+ move(worker_tagged_message));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
break;
}
case kInitiateRebuildMessage: {
@@ -147,10 +159,14 @@ void Shiftboss::run() {
<< "' message from worker (client " << annotated_message.sender << ") to Foreman";
DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(annotated_message.tagged_message));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(annotated_message.tagged_message));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
break;
}
case kSaveQueryResultMessage: {
@@ -178,10 +194,14 @@ void Shiftboss::run() {
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
<< "') to Foreman";
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(message_response));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
break;
}
case kPoisonMessage: {
@@ -192,12 +212,14 @@ void Shiftboss::run() {
tmb::MessageStyle broadcast_style;
broadcast_style.Broadcast(true);
- tmb::MessageBus::SendStatus send_status =
+ const MessageBus::SendStatus send_status =
bus_->Send(shiftboss_client_id_,
worker_addresses_,
broadcast_style,
move(annotated_message.tagged_message));
- DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be broadcast from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to All workers";
return;
}
default: {
@@ -280,10 +302,14 @@ void Shiftboss::processQueryInitiateMessage(
kQueryInitiateResponseMessage);
free(proto_bytes);
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(message_response));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
}
void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -307,8 +333,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
proto.set_query_id(query_id);
proto.set_operator_index(op_index);
proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
- // TODO(zuyu): Multiple Shiftboss support.
- proto.set_shiftboss_index(0);
+ proto.set_shiftboss_index(shiftboss_index_);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -319,10 +344,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kInitiateRebuildResponseMessage);
free(proto_bytes);
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(message_response));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
// NOTE(zuyu): Worker releases the memory after the execution of
@@ -347,10 +376,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
<< "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
<< "') to worker " << worker_index;
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- workers_->getClientID(worker_index),
- move(worker_tagged_message));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ workers_->getClientID(worker_index),
+ move(worker_tagged_message));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e443b2b8/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 32d2408..9464a4d 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -223,6 +223,9 @@ class Shiftboss : public Thread {
tmb::client_id shiftboss_client_id_, foreman_client_id_;
+ // Unique per Shiftboss instance.
+ std::uint64_t shiftboss_index_;
+
// TMB recipients for all workers managed by this Shiftboss.
tmb::Address worker_addresses_;