You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:41 UTC
[17/30] incubator-quickstep git commit: Introduced Shiftboss for the
distributed version.
Introduced Shiftboss for the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7415ee87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7415ee87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7415ee87
Branch: refs/heads/quickstep-28-29
Commit: 7415ee87fa088054e09b2a9efb88389c43351b12
Parents: 9f9e3b7
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 22 13:29:03 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Wed Jul 27 15:06:13 2016 -0700
----------------------------------------------------------------------
catalog/CatalogDatabaseCache.hpp | 5 +
query_execution/CMakeLists.txt | 24 ++
query_execution/QueryExecutionMessages.proto | 30 ++
query_execution/QueryExecutionTypedefs.hpp | 8 +
query_execution/Shiftboss.cpp | 360 ++++++++++++++++++++++
query_execution/Shiftboss.hpp | 241 +++++++++++++++
storage/StorageManager.hpp | 1 +
7 files changed, 669 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/catalog/CatalogDatabaseCache.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogDatabaseCache.hpp b/catalog/CatalogDatabaseCache.hpp
index 77afe2a..b3e73a6 100644
--- a/catalog/CatalogDatabaseCache.hpp
+++ b/catalog/CatalogDatabaseCache.hpp
@@ -54,6 +54,11 @@ namespace serialization { class CatalogDatabase; }
class CatalogDatabaseCache : public CatalogDatabaseLite {
public:
/**
+ * @brief Constructor.
+ **/
+ CatalogDatabaseCache() {}
+
+ /**
* @brief Constructor. Reconstruct a database cache from its serialized
* Protocol Buffer form.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index f582ba5..8bf1ab1 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -52,6 +52,7 @@ if (ENABLE_DISTRIBUTED)
endif()
add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp)
add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
endif()
add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
@@ -157,6 +158,8 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto
quickstep_utility_SortConfiguration_proto
${PROTOBUF_LIBRARY})
target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_catalog_Catalog_proto
+ quickstep_queryexecution_QueryContext_proto
quickstep_relationaloperators_WorkOrder_proto
${PROTOBUF_LIBRARY})
target_link_libraries(quickstep_queryexecution_QueryExecutionState
@@ -214,6 +217,26 @@ target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
quickstep_utility_Macros
tmb)
if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_Shiftboss
+ glog
+ quickstep_catalog_CatalogDatabaseCache
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_WorkerDirectory
+ quickstep_queryexecution_WorkerMessage
+ quickstep_relationaloperators_RebuildWorkOrder
+ quickstep_relationaloperators_WorkOrderFactory
+ quickstep_storage_InsertDestination
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_threading_Thread
+ quickstep_threading_ThreadUtil
+ quickstep_utility_Macros
+ tmb)
target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
quickstep_utility_Macros
tmb)
@@ -272,6 +295,7 @@ if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_BlockLocator
quickstep_queryexecution_QueryManagerDistributed
+ quickstep_queryexecution_Shiftboss
quickstep_queryexecution_ShiftbossDirectory)
endif()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index fa20993..591ca6c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,6 +16,8 @@ syntax = "proto2";
package quickstep.serialization;
+import "catalog/Catalog.proto";
+import "query_execution/QueryContext.proto";
import "relational_operators/WorkOrder.proto";
// Used for any messages that do not carry payloads.
@@ -73,6 +75,25 @@ message WorkOrdersAvailableMessage {
}
// Distributed version related messages.
+message ShiftbossRegistrationMessage {
+ // The total Work Order processing capacity in Shiftboss, which equals to the
+ // sum of the capacity of each worker managed by Shiftboss.
+ required uint64 work_order_capacity = 1;
+}
+
+message ShiftbossRegistrationResponseMessage {
+}
+
+message QueryInitiateMessage {
+ required uint64 query_id = 1;
+ required CatalogDatabase catalog_database_cache = 2;
+ required QueryContext query_context = 3;
+}
+
+message QueryInitiateResponseMessage {
+ required uint64 query_id = 1;
+}
+
message WorkOrderMessage {
required uint64 query_id = 1;
required uint64 operator_index = 2;
@@ -92,6 +113,15 @@ message InitiateRebuildResponseMessage {
required uint64 num_rebuild_work_orders = 3;
}
+message QueryResultRelationMessage {
+ required int32 relation_id = 1;
+ repeated fixed64 blocks = 2 [packed=true];
+}
+
+message QueryResultRelationResponseMessage {
+ required int32 relation_id = 1;
+}
+
// BlockLocator related messages.
message BlockDomainRegistrationMessage {
// Format IP:Port, i.e., "0.0.0.0:0".
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 61e76d7..d73d4ee 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -73,9 +73,17 @@ enum QueryExecutionMessageType : message_type_id {
kPoisonMessage, // From the main thread to Foreman and Workers.
#ifdef QUICKSTEP_DISTRIBUTED
+ kShiftbossRegistrationMessage, // From Shiftboss to Foreman.
+ kShiftbossRegistrationResponseMessage, // From Foreman to Shiftboss.
+ kQueryInitiateMessage, // From Foreman to Shiftboss.
+ kQueryInitiateResponseMessage, // From Shiftboss to Foreman.
+
kInitiateRebuildMessage, // From Foreman to Shiftboss.
kInitiateRebuildResponseMessage, // From Shiftboss to Foreman.
+ kQueryResultRelationMessage, // From Foreman to Shiftboss.
+ kQueryResultRelationResponseMessage, // From Shiftboss to Foreman.
+
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
new file mode 100644
index 0000000..af56306
--- /dev/null
+++ b/query_execution/Shiftboss.cpp
@@ -0,0 +1,360 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/Shiftboss.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "relational_operators/RebuildWorkOrder.hpp"
+#include "relational_operators/WorkOrderFactory.hpp"
+#include "storage/InsertDestination.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "threading/ThreadUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class WorkOrder;
+
+void Shiftboss::run() {
+ if (cpu_id_ >= 0) {
+ // We can pin the shiftboss thread to a CPU if specified.
+ ThreadUtil::BindToCPU(cpu_id_);
+ }
+
+ for (;;) {
+ // Receive() is a blocking call, causing this thread to sleep until next
+ // message is received.
+ AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') received the typed '" << annotated_message.tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
+ switch (annotated_message.tagged_message.message_type()) {
+ case kShiftbossRegistrationResponseMessage: {
+ foreman_client_id_ = annotated_message.sender;
+ break;
+ }
+ case kQueryInitiateMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::QueryInitiateMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
+ break;
+ }
+ case kWorkOrderMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::WorkOrderMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const std::size_t query_id = proto.query_id();
+ DCHECK_EQ(1u, query_contexts_.count(query_id));
+
+ WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
+ &database_cache_,
+ query_contexts_[query_id].get(),
+ storage_manager_,
+ shiftboss_client_id_,
+ bus_);
+
+ unique_ptr<WorkerMessage> worker_message(
+ WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
+
+ TaggedMessage worker_tagged_message(worker_message.get(),
+ sizeof(*worker_message),
+ kWorkOrderMessage);
+
+ const size_t worker_index = getSchedulableWorker();
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
+ << "') from Foreman to worker " << worker_index;
+
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ workers_->getClientID(worker_index),
+ move(worker_tagged_message));
+ break;
+ }
+ case kInitiateRebuildMessage: {
+ // Construct rebuild work orders, and send back their number to
+ // 'ForemanDistributed'.
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::InitiateRebuildMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processInitiateRebuildMessage(proto.query_id(),
+ proto.operator_index(),
+ proto.insert_destination_index(),
+ proto.relation_id());
+ break;
+ }
+ case kWorkOrderCompleteMessage: // Fall through.
+ case kRebuildWorkOrderCompleteMessage:
+ case kDataPipelineMessage:
+ case kWorkOrdersAvailableMessage:
+ case kWorkOrderFeedbackMessage: {
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') forwarded typed '" << annotated_message.tagged_message.message_type()
+ << "' message from worker (client " << annotated_message.sender << ") to Foreman";
+
+ DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(annotated_message.tagged_message));
+ break;
+ }
+ case kQueryResultRelationMessage: {
+ // TODO(zuyu): Rename to kSaveQueryResultMessage.
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::QueryResultRelationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ for (int i = 0; i < proto.blocks_size(); ++i) {
+ const block_id block = proto.blocks(i);
+ storage_manager_->saveBlockOrBlob(block);
+ if (storage_manager_->blockOrBlobIsLoaded(block)) {
+ // NOTE(zuyu): eviction is required to avoid accesses to the query
+ // result relation schema in CatalogDatabaseCache, for all query
+ // optimizer execution generator unit tests and the single-process
+ // Quickstep CLI.
+ storage_manager_->evictBlockOrBlob(block);
+ }
+ }
+
+ serialization::QueryResultRelationResponseMessage ack_proto;
+ ack_proto.set_relation_id(proto.relation_id());
+
+ const size_t ack_proto_length = ack_proto.ByteSize();
+ char *ack_proto_bytes = static_cast<char*>(malloc(ack_proto_length));
+ CHECK(ack_proto.SerializeToArray(ack_proto_bytes, ack_proto_length));
+
+ TaggedMessage ack_message(static_cast<const void*>(ack_proto_bytes),
+ ack_proto_length,
+ kQueryResultRelationResponseMessage);
+ free(ack_proto_bytes);
+
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent QueryResultRelationResponseMessage (typed '" << kQueryResultRelationResponseMessage
+ << ") to Foreman";
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(ack_message));
+ break;
+ }
+ case kPoisonMessage: {
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') forwarded PoisonMessage (typed '" << kPoisonMessage
+ << "') from Foreman to all workers";
+
+ tmb::MessageStyle broadcast_style;
+ broadcast_style.Broadcast(true);
+
+ tmb::MessageBus::SendStatus send_status =
+ bus_->Send(shiftboss_client_id_,
+ worker_addresses_,
+ broadcast_style,
+ move(annotated_message.tagged_message));
+ DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+ return;
+ }
+ default: {
+ LOG(FATAL) << "Unknown TMB message type";
+ }
+ }
+ }
+}
+
+size_t Shiftboss::getSchedulableWorker() {
+ const size_t num_workers = workers_->getNumWorkers();
+
+ size_t curr_worker = start_worker_index_;
+ for (;;) {
+ if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) {
+ start_worker_index_ = (curr_worker + 1) % num_workers;
+ // TODO(zuyu): workers_->incrementNumQueuedWorkOrders(curr_worker);
+ // But we need a WorkOrder queue first.
+ return curr_worker;
+ }
+
+ curr_worker = (curr_worker + 1) % num_workers;
+ }
+}
+
+void Shiftboss::registerWithForeman() {
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
+ << "') to all";
+
+ tmb::Address all_addresses;
+ all_addresses.All(true);
+
+ tmb::MessageStyle style;
+
+ serialization::ShiftbossRegistrationMessage proto;
+ proto.set_work_order_capacity(getWorkOrderCapacity());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kShiftbossRegistrationMessage);
+ free(proto_bytes);
+
+ tmb::MessageBus::SendStatus send_status =
+ bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
+ DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+}
+
+void Shiftboss::processQueryInitiateMessage(
+ const std::size_t query_id,
+ const serialization::CatalogDatabase &catalog_database_cache_proto,
+ const serialization::QueryContext &query_context_proto) {
+ database_cache_.update(catalog_database_cache_proto);
+
+ unique_ptr<QueryContext> query_context(
+ new QueryContext(query_context_proto,
+ database_cache_,
+ storage_manager_,
+ shiftboss_client_id_,
+ bus_));
+ query_contexts_.emplace(query_id, move(query_context));
+
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
+ << "') to Foreman";
+
+ serialization::QueryInitiateResponseMessage proto;
+ proto.set_query_id(query_id);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryInitiateResponseMessage);
+ free(proto_bytes);
+
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(ack_message));
+}
+
+void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
+ const std::size_t op_index,
+ const QueryContext::insert_destination_id dest_index,
+ const relation_id rel_id) {
+ DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
+
+ DCHECK_EQ(1u, query_contexts_.count(query_id));
+ InsertDestination *insert_destination = query_contexts_[query_id]->getInsertDestination(dest_index);
+ DCHECK(insert_destination != nullptr);
+
+ vector<MutableBlockReference> partially_filled_block_refs;
+ insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
+
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
+ << "') to Foreman";
+
+ serialization::InitiateRebuildResponseMessage proto;
+ proto.set_query_id(query_id);
+ proto.set_operator_index(op_index);
+ proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kInitiateRebuildResponseMessage);
+ free(proto_bytes);
+
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(ack_message));
+
+ for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
+ // NOTE(zuyu): Worker releases the memory after the execution of
+ // RebuildWorkOrder on the Worker.
+ WorkOrder *rebuild_work_order =
+ new RebuildWorkOrder(query_id,
+ move(partially_filled_block_refs[i]),
+ op_index,
+ rel_id,
+ shiftboss_client_id_,
+ bus_);
+
+ unique_ptr<WorkerMessage> worker_message(
+ WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index));
+
+ TaggedMessage worker_tagged_message(worker_message.get(),
+ sizeof(*worker_message),
+ kRebuildWorkOrderMessage);
+
+ const size_t worker_index = getSchedulableWorker();
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
+ << "') to worker " << worker_index;
+
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ workers_->getClientID(worker_index),
+ move(worker_tagged_message));
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
new file mode 100644
index 0000000..096ab74
--- /dev/null
+++ b/query_execution/Shiftboss.hpp
@@ -0,0 +1,241 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <unordered_map>
+
+#include "catalog/CatalogDatabaseCache.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+class StorageManager;
+
+namespace serialization {
+class CatalogDatabase;
+class QueryContext;
+} // namespace serialization
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief The Shiftboss accepts workorder protos from shiftboss, and assigns
+ * the workorders to workers.
+ **/
+class Shiftboss : public Thread {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param bus A pointer to the TMB.
+ * @param storage_manager The StorageManager to use.
+ * @param workers A pointer to the WorkerDirectory.
+ * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
+ *
+ * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ Shiftboss(tmb::MessageBus *bus,
+ StorageManager *storage_manager,
+ WorkerDirectory *workers,
+ const int cpu_id = -1)
+ : bus_(DCHECK_NOTNULL(bus)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ workers_(DCHECK_NOTNULL(workers)),
+ cpu_id_(cpu_id),
+ shiftboss_client_id_(tmb::kClientIdNone),
+ foreman_client_id_(tmb::kClientIdNone),
+ max_msgs_per_worker_(1),
+ start_worker_index_(0u) {
+ // Check to have at least one Worker.
+ DCHECK_GT(workers->getNumWorkers(), 0u);
+
+ shiftboss_client_id_ = bus_->Connect();
+ LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
+ DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
+
+ // Messages between Foreman and Shiftboss.
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage);
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
+
+ // Message sent to Worker.
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
+
+ // Message sent to Foreman.
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrdersAvailableMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+
+ // Forward the following message types from Foreman to Workers.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
+
+ // Forward the following message types from Workers to Foreman.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryResultRelationMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryResultRelationResponseMessage);
+
+ // Stop itself.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
+ // Stop all workers.
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage);
+
+ for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
+ worker_addresses_.AddRecipient(workers_->getClientID(i));
+ }
+
+ registerWithForeman();
+ }
+
+ ~Shiftboss() override {
+ }
+
+ /**
+ * @brief Get the TMB client ID of Shiftboss thread.
+ *
+ * @return TMB client ID of shiftboss thread.
+ **/
+ inline tmb::client_id getBusClientID() const {
+ return shiftboss_client_id_;
+ }
+
+ /**
+ * @brief Get the Work Order processing capacity of all Workers managed by
+ * Shiftboss during a single round of WorkOrder dispatch.
+ **/
+ inline std::size_t getWorkOrderCapacity() const {
+ DCHECK_NE(max_msgs_per_worker_, 0u);
+ return max_msgs_per_worker_ * workers_->getNumWorkers();
+ }
+
+ /**
+ * @brief Get the Worker to assign WorkOrders for execution. Block to wait if
+ * all Workers have reached their capacity for queued WorkOrders.
+ **/
+ // TODO(zuyu): To achieve non-blocking, we need a queue to cache received
+ // normal Work Order protos from Foreman and the generated rebuild Work Orders.
+ inline std::size_t getSchedulableWorker();
+
+ /**
+ * @brief Set the maximum number of messages that should be allocated to each
+ * worker during a single round of WorkOrder dispatch.
+ *
+ * @param max_msgs_per_worker Maximum number of messages.
+ **/
+ inline void setMaxMessagesPerWorker(const std::size_t max_msgs_per_worker) {
+ max_msgs_per_worker_ = max_msgs_per_worker;
+ }
+
+ protected:
+ /**
+ * @brief The shiftboss receives workorders, and based on the response it
+ * assigns workorders to workers.
+ *
+ * @note The workers who get the messages from the Shiftboss execute and
+ * subsequently delete the WorkOrder contained in the message.
+ **/
+ void run() override;
+
+ private:
+ void registerWithForeman();
+
+ /**
+ * @brief Process the Shiftboss initiate message and ack back.
+ *
+ * @param query_id The given query id.
+ * @param catalog_database_cache_proto The proto used to update
+ * CatalogDatabaseCache.
+ * @param query_context_proto The QueryContext proto.
+ **/
+ void processQueryInitiateMessage(const std::size_t query_id,
+ const serialization::CatalogDatabase &catalog_database_cache_proto,
+ const serialization::QueryContext &query_context_proto);
+
+ /**
+ * @brief Process the RebuildWorkOrder initiate message and ack back.
+ *
+ * @param query_id The ID of the query to which this RebuildWorkOrder initiate
+ * message belongs.
+ * @param op_index The index of the operator for rebuild work orders.
+ * @param dest_index The InsertDestination index in QueryContext to rebuild.
+ * @param rel_id The relation that needs to generate rebuild work orders.
+ **/
+ void processInitiateRebuildMessage(const std::size_t query_id,
+ const std::size_t op_index,
+ const QueryContext::insert_destination_id dest_index,
+ const relation_id rel_id);
+
+ // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss,
+ // and Shiftboss and Worker thread pool.
+ tmb::MessageBus *bus_;
+
+ CatalogDatabaseCache database_cache_;
+ StorageManager *storage_manager_;
+ WorkerDirectory *workers_;
+
+ // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
+ const int cpu_id_;
+
+ tmb::client_id shiftboss_client_id_, foreman_client_id_;
+
+ // TMB recipients for all workers managed by this Shiftboss.
+ tmb::Address worker_addresses_;
+
+ // During a single round of WorkOrder dispatch, a Worker should be allocated
+ // at most these many WorkOrders.
+ std::size_t max_msgs_per_worker_;
+
+ // The worker index for scheduling Work Order.
+ std::size_t start_worker_index_;
+
+ // QueryContexts per query.
+ std::unordered_map<std::size_t, std::unique_ptr<QueryContext>> query_contexts_;
+
+ DISALLOW_COPY_AND_ASSIGN(Shiftboss);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 50ddb0f..348018f 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -619,6 +619,7 @@ class StorageManager {
FRIEND_TEST(BlockLocatorTest, BlockTest);
FRIEND_TEST(BlockLocatorTest, BlobTest);
+ friend class Shiftboss;
FRIEND_TEST(StorageManagerTest, DifferentNUMANodeBlobTestWithEviction);
FRIEND_TEST(StorageManagerTest, EvictFromSameShardTest);