You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/09 23:42:51 UTC
[9/9] incubator-quickstep git commit: Used two TMB implementations in
Shiftboss.
Used two TMB implementations in Shiftboss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d8fc9461
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d8fc9461
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d8fc9461
Branch: refs/heads/two-level-tmb
Commit: d8fc9461b985ebbdc6c8feee3f3ce874de410f05
Parents: c40c553
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 12:48:31 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 9 15:42:36 2017 -0800
----------------------------------------------------------------------
cli/distributed/Executor.cpp | 7 +-
cli/distributed/Executor.hpp | 4 +
query_execution/Shiftboss.cpp | 334 ++++++++++---------
query_execution/Shiftboss.hpp | 79 +++--
.../DistributedExecutionGeneratorTestRunner.cpp | 8 +-
.../DistributedExecutionGeneratorTestRunner.hpp | 1 +
6 files changed, 229 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index 1d03579..5cc7df0 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -35,6 +35,7 @@
#include "tmb/id_typedefs.h"
#include "tmb/native_net_client_message_bus.h"
+#include "tmb/pure_memory_message_bus.h"
#include "glog/logging.h"
@@ -47,6 +48,8 @@ using tmb::client_id;
namespace quickstep {
void Executor::init() {
+ bus_local_.Initialize();
+
executor_client_id_ = bus_.Connect();
DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_;
@@ -59,7 +62,7 @@ void Executor::init() {
for (std::size_t worker_thread_index = 0;
worker_thread_index < FLAGS_num_workers;
++worker_thread_index) {
- workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_));
+ workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_local_));
worker_client_ids.push_back(workers_.back()->getBusClientID());
}
@@ -76,7 +79,7 @@ void Executor::init() {
data_exchanger_.start();
shiftboss_ =
- make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+ make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get());
shiftboss_->start();
for (const auto &worker : workers_) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/cli/distributed/Executor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp
index 6ffa756..aafeeae 100644
--- a/cli/distributed/Executor.hpp
+++ b/cli/distributed/Executor.hpp
@@ -24,6 +24,7 @@
#include <vector>
#include "cli/distributed/Role.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/Shiftboss.hpp"
#include "query_execution/Worker.hpp"
#include "query_execution/WorkerDirectory.hpp"
@@ -65,6 +66,9 @@ class Executor final : public Role {
void run() override {}
private:
+ // Used between Shiftboss and Workers.
+ MessageBusImpl bus_local_;
+
tmb::client_id executor_client_id_;
std::vector<std::unique_ptr<Worker>> workers_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 2ed42d0..5e6014d 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -75,156 +75,164 @@ void Shiftboss::run() {
for (;;) {
// Receive() is a blocking call, causing this thread to sleep until next
// message is received.
- AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') received the typed '" << annotated_message.tagged_message.message_type()
- << "' message from client " << annotated_message.sender;
- switch (annotated_message.tagged_message.message_type()) {
- case kQueryInitiateMessage: {
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::QueryInitiateMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
- break;
- }
- case kWorkOrderMessage: {
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::WorkOrderMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- const std::size_t query_id = proto.query_id();
- DCHECK_EQ(1u, query_contexts_.count(query_id));
-
- WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
- shiftboss_index_,
- &database_cache_,
- query_contexts_[query_id].get(),
- storage_manager_,
- shiftboss_client_id_,
- bus_);
-
- unique_ptr<WorkerMessage> worker_message(
- WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
-
- TaggedMessage worker_tagged_message(worker_message.get(),
- sizeof(*worker_message),
- kWorkOrderMessage);
-
- const size_t worker_index = getSchedulableWorker();
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
- << "') from Foreman to worker " << worker_index;
-
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- workers_->getClientID(worker_index),
- move(worker_tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- break;
- }
- case kInitiateRebuildMessage: {
- // Construct rebuild work orders, and send back their number to
- // 'ForemanDistributed'.
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::InitiateRebuildMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processInitiateRebuildMessage(proto.query_id(),
- proto.operator_index(),
- proto.insert_destination_index(),
- proto.relation_id());
- break;
- }
- case kCatalogRelationNewBlockMessage: // Fall through.
- case kDataPipelineMessage:
- case kWorkOrderFeedbackMessage:
- case kWorkOrderCompleteMessage:
- case kRebuildWorkOrderCompleteMessage: {
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') forwarded typed '" << annotated_message.tagged_message.message_type()
- << "' message from Worker with TMB client ID '" << annotated_message.sender
- << "' to Foreman with TMB client ID " << foreman_client_id_;
-
- DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(annotated_message.tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- break;
- }
- case kQueryTeardownMessage: {
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ AnnotatedMessage annotated_message;
+ if (bus_global_->ReceiveIfAvailable(shiftboss_client_id_global_, &annotated_message, 0, true)) {
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+ << "') received the typed '" << annotated_message.tagged_message.message_type()
+ << "' message from Foreman " << annotated_message.sender;
+ switch (annotated_message.tagged_message.message_type()) {
+ case kQueryInitiateMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::QueryInitiateMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
+ break;
+ }
+ case kWorkOrderMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::WorkOrderMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const std::size_t query_id = proto.query_id();
+ DCHECK_EQ(1u, query_contexts_.count(query_id));
+
+ WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
+ shiftboss_index_,
+ &database_cache_,
+ query_contexts_[query_id].get(),
+ storage_manager_,
+ shiftboss_client_id_local_,
+ bus_local_);
+
+ unique_ptr<WorkerMessage> worker_message(
+ WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
+
+ TaggedMessage worker_tagged_message(worker_message.get(),
+ sizeof(*worker_message),
+ kWorkOrderMessage);
+
+ const size_t worker_index = getSchedulableWorker();
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
+ << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
+ << "') from Foreman to worker " << worker_index;
+
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_local_,
+ shiftboss_client_id_local_,
+ workers_->getClientID(worker_index),
+ move(worker_tagged_message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+ break;
+ }
+ case kInitiateRebuildMessage: {
+ // Construct rebuild work orders, and send back their number to
+ // 'ForemanDistributed'.
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::InitiateRebuildMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processInitiateRebuildMessage(proto.query_id(),
+ proto.operator_index(),
+ proto.insert_destination_index(),
+ proto.relation_id());
+ break;
+ }
+ case kQueryTeardownMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
- serialization::QueryTeardownMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+ serialization::QueryTeardownMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
- query_contexts_.erase(proto.query_id());
- break;
+ query_contexts_.erase(proto.query_id());
+ break;
+ }
+ case kSaveQueryResultMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::SaveQueryResultMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ for (int i = 0; i < proto.blocks_size(); ++i) {
+ storage_manager_->saveBlockOrBlob(proto.blocks(i));
+ }
+
+ // Clean up query execution states, i.e., QueryContext.
+ query_contexts_.erase(proto.query_id());
+
+ serialization::SaveQueryResultResponseMessage proto_response;
+ proto_response.set_query_id(proto.query_id());
+ proto_response.set_relation_id(proto.relation_id());
+ proto_response.set_cli_id(proto.cli_id());
+ proto_response.set_shiftboss_index(shiftboss_index_);
+
+ const size_t proto_response_length = proto_response.ByteSize();
+ char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
+ CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
+
+ TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
+ proto_response_length,
+ kSaveQueryResultResponseMessage);
+ free(proto_response_bytes);
+
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+ << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
+ << "') to Foreman with TMB client ID " << foreman_client_id_;
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_global_,
+ shiftboss_client_id_global_,
+ foreman_client_id_,
+ move(message_response));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+ break;
+ }
+ case kPoisonMessage: {
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+ << "') forwarded PoisonMessage (typed '" << kPoisonMessage
+ << "') from Foreman to all workers";
+
+ tmb::MessageStyle broadcast_style;
+ broadcast_style.Broadcast(true);
+
+ const MessageBus::SendStatus send_status =
+ bus_local_->Send(shiftboss_client_id_local_, worker_addresses_, broadcast_style,
+ move(annotated_message.tagged_message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+ return;
+ }
+ default: {
+ LOG(FATAL) << "Unknown TMB message type";
+ }
}
- case kSaveQueryResultMessage: {
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::SaveQueryResultMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+ }
- for (int i = 0; i < proto.blocks_size(); ++i) {
- storage_manager_->saveBlockOrBlob(proto.blocks(i));
+ while (bus_local_->ReceiveIfAvailable(shiftboss_client_id_local_, &annotated_message, 0, true)) {
+ switch (annotated_message.tagged_message.message_type()) {
+ case kCatalogRelationNewBlockMessage:
+ case kDataPipelineMessage:
+ case kWorkOrderFeedbackMessage:
+ case kWorkOrderCompleteMessage:
+ case kRebuildWorkOrderCompleteMessage: {
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+ << "') forwarded typed '" << annotated_message.tagged_message.message_type()
+ << "' message from Worker with TMB client ID '" << annotated_message.sender
+ << "' to Foreman with TMB client ID " << foreman_client_id_;
+
+ DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_global_,
+ shiftboss_client_id_global_,
+ foreman_client_id_,
+ move(annotated_message.tagged_message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+ break;
+ }
+ default: {
+ LOG(FATAL) << "Unknown TMB message type";
}
-
- // Clean up query execution states, i.e., QueryContext.
- query_contexts_.erase(proto.query_id());
-
- serialization::SaveQueryResultResponseMessage proto_response;
- proto_response.set_query_id(proto.query_id());
- proto_response.set_relation_id(proto.relation_id());
- proto_response.set_cli_id(proto.cli_id());
- proto_response.set_shiftboss_index(shiftboss_index_);
-
- const size_t proto_response_length = proto_response.ByteSize();
- char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
- CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
-
- TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
- proto_response_length,
- kSaveQueryResultResponseMessage);
- free(proto_response_bytes);
-
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
- << "') to Foreman with TMB client ID " << foreman_client_id_;
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- break;
- }
- case kPoisonMessage: {
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') forwarded PoisonMessage (typed '" << kPoisonMessage
- << "') from Foreman to all workers";
-
- tmb::MessageStyle broadcast_style;
- broadcast_style.Broadcast(true);
-
- const MessageBus::SendStatus send_status =
- bus_->Send(shiftboss_client_id_,
- worker_addresses_,
- broadcast_style,
- move(annotated_message.tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- return;
- }
- default: {
- LOG(FATAL) << "Unknown TMB message type";
}
}
}
@@ -264,21 +272,21 @@ void Shiftboss::registerWithForeman() {
kShiftbossRegistrationMessage);
free(proto_bytes);
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
<< "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
<< "') to all";
tmb::MessageBus::SendStatus send_status =
- bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
+ bus_global_->Send(shiftboss_client_id_global_, all_addresses, style, move(message));
DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
void Shiftboss::processShiftbossRegistrationResponseMessage() {
- AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+ AnnotatedMessage annotated_message(bus_global_->Receive(shiftboss_client_id_global_, 0, true));
const TaggedMessage &tagged_message = annotated_message.tagged_message;
DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
foreman_client_id_ = annotated_message.sender;
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_local_
<< "') received the typed '" << kShiftbossRegistrationResponseMessage
<< "' message from ForemanDistributed with client " << foreman_client_id_;
@@ -289,10 +297,10 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_);
// Forward this message to Workers regarding <shiftboss_index_>.
- QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
+ QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_local_,
worker_addresses_,
move(annotated_message.tagged_message),
- bus_);
+ bus_local_);
}
void Shiftboss::processQueryInitiateMessage(
@@ -302,7 +310,7 @@ void Shiftboss::processQueryInitiateMessage(
database_cache_.update(catalog_database_cache_proto);
auto query_context = std::make_unique<QueryContext>(
- query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_);
+ query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_local_, bus_local_);
query_contexts_.emplace(query_id, move(query_context));
serialization::QueryInitiateResponseMessage proto;
@@ -317,12 +325,12 @@ void Shiftboss::processQueryInitiateMessage(
kQueryInitiateResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
<< "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
<< "') to Foreman with TMB client ID " << foreman_client_id_;
const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
+ QueryExecutionUtil::SendTMBMessage(bus_global_,
+ shiftboss_client_id_global_,
foreman_client_id_,
move(message_response));
CHECK(send_status == MessageBus::SendStatus::kOK);
@@ -356,12 +364,12 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kInitiateRebuildResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
<< "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
<< "') to Foreman with TMB client ID " << foreman_client_id_;
const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
+ QueryExecutionUtil::SendTMBMessage(bus_global_,
+ shiftboss_client_id_global_,
foreman_client_id_,
move(message_response));
CHECK(send_status == MessageBus::SendStatus::kOK);
@@ -374,8 +382,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
move(partially_filled_block_refs[i]),
op_index,
rel_id,
- shiftboss_client_id_,
- bus_);
+ shiftboss_client_id_local_,
+ bus_local_);
unique_ptr<WorkerMessage> worker_message(
WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index));
@@ -385,13 +393,13 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kRebuildWorkOrderMessage);
const size_t worker_index = getSchedulableWorker();
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
<< "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
<< "') to worker " << worker_index;
const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
+ QueryExecutionUtil::SendTMBMessage(bus_local_,
+ shiftboss_client_id_local_,
workers_->getClientID(worker_index),
move(worker_tagged_message));
CHECK(send_status == MessageBus::SendStatus::kOK);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 6538d48..4864988 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -61,7 +61,8 @@ class Shiftboss : public Thread {
/**
* @brief Constructor.
*
- * @param bus A pointer to the TMB.
+ * @param bus_global A pointer to the TMB for Foreman.
+ * @param bus_local A pointer to the TMB for Workers.
* @param storage_manager The StorageManager to use.
* @param workers A pointer to the WorkerDirectory.
* @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
@@ -69,69 +70,75 @@ class Shiftboss : public Thread {
* @note If cpu_id is not specified, Shiftboss thread can be possibly moved
* around on different CPUs by the OS.
**/
- Shiftboss(tmb::MessageBus *bus,
+ Shiftboss(tmb::MessageBus *bus_global,
+ tmb::MessageBus *bus_local,
StorageManager *storage_manager,
WorkerDirectory *workers,
const int cpu_id = -1)
- : bus_(DCHECK_NOTNULL(bus)),
+ : bus_global_(DCHECK_NOTNULL(bus_global)),
+ bus_local_(DCHECK_NOTNULL(bus_local)),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
workers_(DCHECK_NOTNULL(workers)),
cpu_id_(cpu_id),
- shiftboss_client_id_(tmb::kClientIdNone),
+ shiftboss_client_id_global_(tmb::kClientIdNone),
+ shiftboss_client_id_local_(tmb::kClientIdNone),
foreman_client_id_(tmb::kClientIdNone),
max_msgs_per_worker_(1),
start_worker_index_(0u) {
// Check to have at least one Worker.
DCHECK_GT(workers->getNumWorkers(), 0u);
- shiftboss_client_id_ = bus_->Connect();
- LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
- DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
+ shiftboss_client_id_global_ = bus_global_->Connect();
+ LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_global_;
+ DCHECK_NE(shiftboss_client_id_global_, tmb::kClientIdNone);
+
+ shiftboss_client_id_local_ = bus_local_->Connect();
+ DCHECK_NE(shiftboss_client_id_local_, tmb::kClientIdNone);
// Messages between Foreman and Shiftboss.
- bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
+ bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kShiftbossRegistrationMessage);
+ bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kShiftbossRegistrationResponseMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage);
+ bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryInitiateMessage);
+ bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kQueryInitiateResponseMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
+ bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kInitiateRebuildMessage);
+ bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kInitiateRebuildResponseMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+ bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kSaveQueryResultMessage);
+ bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kSaveQueryResultResponseMessage);
// Message sent to Worker.
- bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
+ bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kShiftbossRegistrationResponseMessage);
+ bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kRebuildWorkOrderMessage);
// Forward the following message types from Foreman to Workers.
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
+ bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kWorkOrderMessage);
+ bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kWorkOrderMessage);
// Forward the following message types from Workers to Foreman.
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+ bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kCatalogRelationNewBlockMessage);
+ bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kCatalogRelationNewBlockMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+ bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kDataPipelineMessage);
+ bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kDataPipelineMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+ bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderFeedbackMessage);
+ bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderFeedbackMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
+ bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderCompleteMessage);
+ bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderCompleteMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
+ bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kRebuildWorkOrderCompleteMessage);
+ bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kRebuildWorkOrderCompleteMessage);
// Clean up query execution states, i.e., QueryContext.
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
+ bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryTeardownMessage);
// Stop itself.
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
+ bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kPoisonMessage);
// Stop all workers.
- bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage);
+ bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kPoisonMessage);
for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
worker_addresses_.AddRecipient(workers_->getClientID(i));
@@ -149,7 +156,7 @@ class Shiftboss : public Thread {
* @return TMB client ID of shiftboss thread.
**/
inline tmb::client_id getBusClientID() const {
- return shiftboss_client_id_;
+ return shiftboss_client_id_global_;
}
/**
@@ -220,9 +227,7 @@ class Shiftboss : public Thread {
const QueryContext::insert_destination_id dest_index,
const relation_id rel_id);
- // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss,
- // and Shiftboss and Worker thread pool.
- tmb::MessageBus *bus_;
+ tmb::MessageBus *bus_global_, *bus_local_;
CatalogDatabaseCache database_cache_;
StorageManager *storage_manager_;
@@ -231,7 +236,7 @@ class Shiftboss : public Thread {
// The ID of the CPU that the Shiftboss thread can optionally be pinned to.
const int cpu_id_;
- tmb::client_id shiftboss_client_id_, foreman_client_id_;
+ tmb::client_id shiftboss_client_id_global_, shiftboss_client_id_local_, foreman_client_id_;
// Unique per Shiftboss instance.
std::uint64_t shiftboss_index_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 2e18467..71965e6 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -76,6 +76,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption =
DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path)
: query_id_(0),
+ bus_locals_(kNumInstances),
data_exchangers_(kNumInstances) {
bus_.Initialize();
@@ -113,7 +114,10 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
kAnyNUMANodeID);
for (int i = 0; i < kNumInstances; ++i) {
- workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, &bus_));
+ tmb::MessageBus *bus_local = &bus_locals_[i];
+ bus_local->Initialize();
+
+ workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, bus_local));
const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID());
worker_directories_.push_back(
@@ -128,7 +132,7 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
data_exchangers_[i].set_storage_manager(storage_manager.get());
shiftbosses_.push_back(
- make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get()));
+ make_unique<Shiftboss>(&bus_, bus_local, storage_manager.get(), worker_directories_.back().get()));
storage_managers_.push_back(move(storage_manager));
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index 63e320d..2cd2427 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -129,6 +129,7 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
std::unique_ptr<ForemanDistributed> foreman_;
+ std::vector<MessageBusImpl> bus_locals_;
std::vector<std::unique_ptr<Worker>> workers_;
std::vector<std::unique_ptr<WorkerDirectory>> worker_directories_;
std::vector<DataExchangerAsync> data_exchangers_;