You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:04 UTC
[12/50] incubator-quickstep git commit: Refactor Shiftboss for better
debug info.
Refactor Shiftboss for better debug info.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/365fff6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/365fff6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/365fff6a
Branch: refs/heads/quickstep_partition_parser_support
Commit: 365fff6a3371f6516488a808cc79e929ff789b4a
Parents: 9a005f3
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Nov 17 15:02:17 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Nov 17 15:02:17 2016 -0800
----------------------------------------------------------------------
query_execution/Shiftboss.cpp | 56 ++++++++++++++++++++------------------
query_execution/Shiftboss.hpp | 2 ++
2 files changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/365fff6a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index a434527..09d7846 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -56,6 +56,7 @@ using std::string;
using std::unique_ptr;
using std::vector;
+using tmb::AnnotatedMessage;
using tmb::MessageBus;
using tmb::TaggedMessage;
@@ -69,25 +70,16 @@ void Shiftboss::run() {
ThreadUtil::BindToCPU(cpu_id_);
}
+ processShiftbossRegistrationResponseMessage();
+
for (;;) {
// Receive() is a blocking call, causing this thread to sleep until next
// message is received.
AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
<< "') received the typed '" << annotated_message.tagged_message.message_type()
<< "' message from client " << annotated_message.sender;
switch (annotated_message.tagged_message.message_type()) {
- case kShiftbossRegistrationResponseMessage: {
- foreman_client_id_ = annotated_message.sender;
-
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::ShiftbossRegistrationResponseMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- shiftboss_index_ = proto.shiftboss_index();
- break;
- }
case kQueryInitiateMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
@@ -121,7 +113,7 @@ void Shiftboss::run() {
kWorkOrderMessage);
const size_t worker_index = getSchedulableWorker();
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
<< "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
<< "') from Foreman to worker " << worker_index;
@@ -152,7 +144,7 @@ void Shiftboss::run() {
case kWorkOrderFeedbackMessage:
case kWorkOrderCompleteMessage:
case kRebuildWorkOrderCompleteMessage: {
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
<< "') forwarded typed '" << annotated_message.tagged_message.message_type()
<< "' message from Worker with TMB client ID '" << annotated_message.sender
<< "' to Foreman with TMB client ID " << foreman_client_id_;
@@ -203,7 +195,7 @@ void Shiftboss::run() {
kSaveQueryResultResponseMessage);
free(proto_response_bytes);
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
<< "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
<< "') to Foreman with TMB client ID " << foreman_client_id_;
const MessageBus::SendStatus send_status =
@@ -215,7 +207,7 @@ void Shiftboss::run() {
break;
}
case kPoisonMessage: {
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
<< "') forwarded PoisonMessage (typed '" << kPoisonMessage
<< "') from Foreman to all workers";
@@ -271,7 +263,7 @@ void Shiftboss::registerWithForeman() {
kShiftbossRegistrationMessage);
free(proto_bytes);
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
<< "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
<< "') to all";
tmb::MessageBus::SendStatus send_status =
@@ -279,18 +271,30 @@ void Shiftboss::registerWithForeman() {
DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
+void Shiftboss::processShiftbossRegistrationResponseMessage() {
+ const AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
+
+ foreman_client_id_ = annotated_message.sender;
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') received the typed '" << kShiftbossRegistrationResponseMessage
+ << "' message from ForemanDistributed with client " << foreman_client_id_;
+
+ serialization::ShiftbossRegistrationResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ shiftboss_index_ = proto.shiftboss_index();
+}
+
void Shiftboss::processQueryInitiateMessage(
const std::size_t query_id,
const serialization::CatalogDatabase &catalog_database_cache_proto,
const serialization::QueryContext &query_context_proto) {
database_cache_.update(catalog_database_cache_proto);
- unique_ptr<QueryContext> query_context(
- new QueryContext(query_context_proto,
- database_cache_,
- storage_manager_,
- shiftboss_client_id_,
- bus_));
+ auto query_context = std::make_unique<QueryContext>(
+ query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_);
query_contexts_.emplace(query_id, move(query_context));
serialization::QueryInitiateResponseMessage proto;
@@ -305,7 +309,7 @@ void Shiftboss::processQueryInitiateMessage(
kQueryInitiateResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
<< "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
<< "') to Foreman with TMB client ID " << foreman_client_id_;
const MessageBus::SendStatus send_status =
@@ -344,7 +348,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kInitiateRebuildResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
<< "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
<< "') to Foreman with TMB client ID " << foreman_client_id_;
const MessageBus::SendStatus send_status =
@@ -373,7 +377,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kRebuildWorkOrderMessage);
const size_t worker_index = getSchedulableWorker();
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
<< "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
<< "') to worker " << worker_index;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/365fff6a/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 94b10a2..442e61e 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -191,6 +191,8 @@ class Shiftboss : public Thread {
private:
void registerWithForeman();
+ void processShiftbossRegistrationResponseMessage();
+
/**
* @brief Process the Shiftboss initiate message and ack back.
*