You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:36 UTC
[12/30] incubator-quickstep git commit: Added QueryManagerDistributed.
Added QueryManagerDistributed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cdecf0b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cdecf0b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cdecf0b5
Branch: refs/heads/quickstep-28-29
Commit: cdecf0b561d0cf1c80516d51401bd098a6263659
Parents: db0e7e3
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Jul 12 22:04:54 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Jul 19 11:46:42 2016 -0700
----------------------------------------------------------------------
CMakeLists.txt | 10 ++
query_execution/CMakeLists.txt | 34 +++-
query_execution/QueryExecutionMessages.proto | 34 +++-
query_execution/QueryExecutionTypedefs.hpp | 3 +
query_execution/QueryManagerDistributed.cpp | 159 +++++++++++++++++++
query_execution/QueryManagerDistributed.hpp | 105 +++++++++++++
query_execution/ShiftbossDirectory.hpp | 181 ++++++++++++++++++++++
7 files changed, 518 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0e5d3de..0bbde61 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -281,6 +281,16 @@ else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror")
endif()
+ if (ENABLE_DISTRIBUTED)
+ # Clang reports such warning when using Protoc 3.0 beta.
+ if(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
+ CHECK_CXX_COMPILER_FLAG("-Wno-extended-offsetof" COMPILER_HAS_WNO_EXTENDED_OFFSETOF)
+ if (COMPILER_HAS_WNO_EXTENDED_OFFSETOF)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-extended-offsetof")
+ endif()
+ endif()
+ endif()
+
# One of the protobuf headers includes a nested anonymous union within
# another anonymous type. Ordinarily we work around this by compiling the
# protobuf libraries themselves with "-Wno-nested-anon-types" and including
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 028531d..8c12a5d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -28,10 +28,10 @@ else()
endif()
# Declare micro-libs:
+add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
endif()
-add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
@@ -46,7 +46,13 @@ add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryE
add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp)
add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp)
add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
+endif()
add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
+endif()
add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -143,6 +149,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto
quickstep_utility_SortConfiguration_proto
${PROTOBUF_LIBRARY})
target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_relationaloperators_WorkOrder_proto
${PROTOBUF_LIBRARY})
target_link_libraries(quickstep_queryexecution_QueryExecutionState
glog
@@ -167,6 +174,22 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase
quickstep_storage_StorageBlockInfo
quickstep_utility_DAG
quickstep_utility_Macros)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionState
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_QueryManagerBase
+ quickstep_queryexecution_ShiftbossDirectory
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_utility_DAG
+ quickstep_utility_Macros
+ tmb)
+endif()
target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryContext
@@ -182,6 +205,11 @@ target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
quickstep_utility_DAG
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
+ quickstep_utility_Macros
+ tmb)
+endif()
target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
glog
quickstep_relationaloperators_WorkOrder_proto
@@ -233,7 +261,9 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_WorkerSelectionPolicy)
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution
- quickstep_queryexecution_BlockLocator)
+ quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_QueryManagerDistributed
+ quickstep_queryexecution_ShiftbossDirectory)
endif()
# Tests:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 65a8946..fa20993 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,22 +16,24 @@ syntax = "proto2";
package quickstep.serialization;
+import "relational_operators/WorkOrder.proto";
+
// Used for any messages that do not carry payloads.
message EmptyMessage {
}
// Note: There are different types of completion messages for normal work orders
// rebuild work orders. This can be potentially helpful when we want to collect
-// different statistics for executing different types of work orders.
-// e.g. In select normal work order completion message, we could be interested
-// in the selectivity of the block whose work order got execute. In rebuild work
-// order completion message, we may be interested in adding the compression
+// different statistics for executing different types of work orders.
+// e.g. In select normal work order completion message, we could be interested
+// in the selectivity of the block whose work order got execute. In rebuild work
+// order completion message, we may be interested in adding the compression
// ratio or dictionary size of the rebuilt block.
-// TODO(harshad) : If there are different fields in the two message types below,
+// TODO(harshad) : If there are different fields in the two message types below,
// create a base message class called WorkOrderCompletionMessage and make the
// two classes below extend the base class. All the common fields in both the
-// classes can be moved to the base class.
+// classes can be moved to the base class.
// A message sent upon completion of a normal (not rebuild) WorkOrder execution.
message NormalWorkOrderCompletionMessage {
@@ -70,6 +72,26 @@ message WorkOrdersAvailableMessage {
required uint64 query_id = 2;
}
+// Distributed version related messages.
+message WorkOrderMessage {
+ required uint64 query_id = 1;
+ required uint64 operator_index = 2;
+ required WorkOrder work_order = 3;
+}
+
+message InitiateRebuildMessage {
+ required uint64 query_id = 1;
+ required uint64 operator_index = 2;
+ required uint64 insert_destination_index = 3;
+ required int32 relation_id = 4;
+}
+
+message InitiateRebuildResponseMessage {
+ required uint64 query_id = 1;
+ required uint64 operator_index = 2;
+ required uint64 num_rebuild_work_orders = 3;
+}
+
// BlockLocator related messages.
message BlockDomainRegistrationMessage {
// Format IP:Port, i.e., "0.0.0.0:0".
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..61e76d7 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -73,6 +73,9 @@ enum QueryExecutionMessageType : message_type_id {
kPoisonMessage, // From the main thread to Foreman and Workers.
#ifdef QUICKSTEP_DISTRIBUTED
+ kInitiateRebuildMessage, // From Foreman to Shiftboss.
+ kInitiateRebuildResponseMessage, // From Shiftboss to Foreman.
+
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
new file mode 100644
index 0000000..e906fa5
--- /dev/null
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -0,0 +1,159 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/QueryManagerDistributed.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "utility/DAG.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+
+namespace quickstep {
+
+QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
+ ShiftbossDirectory *shiftbosses,
+ const tmb::client_id foreman_client_id,
+ tmb::MessageBus *bus)
+ : QueryManagerBase(query_handle),
+ shiftbosses_(shiftbosses),
+ foreman_client_id_(foreman_client_id),
+ bus_(bus),
+ normal_workorder_protos_container_(
+ new WorkOrderProtosContainer(num_operators_in_dag_)) {
+ // Collect all the workorders from all the relational operators in the DAG.
+ for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
+ if (checkAllBlockingDependenciesMet(index)) {
+ query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
+ processOperator(index, false);
+ }
+ }
+}
+
+serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage(
+ const dag_node_index start_operator_index) {
+ // Default policy: Operator with lowest index first.
+ size_t num_operators_checked = 0;
+ for (dag_node_index index = start_operator_index;
+ num_operators_checked < num_operators_in_dag_;
+ index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
+ if (query_exec_state_->hasExecutionFinished(index)) {
+ continue;
+ }
+ unique_ptr<serialization::WorkOrder> work_order_proto(
+ normal_workorder_protos_container_->getWorkOrderProto(index));
+ if (work_order_proto != nullptr) {
+ query_exec_state_->incrementNumQueuedWorkOrders(index);
+
+ unique_ptr<serialization::WorkOrderMessage> message_proto(new serialization::WorkOrderMessage);
+ message_proto->set_query_id(query_id_);
+ message_proto->set_operator_index(index);
+ message_proto->mutable_work_order()->MergeFrom(*work_order_proto);
+
+ return message_proto.release();
+ }
+ }
+ // No normal WorkOrder protos available right now.
+ return nullptr;
+}
+
+bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) {
+ bool generated_new_workorder_protos = false;
+ if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
+ // Do not fetch any work units until all blocking dependencies are met.
+ // The releational operator is not aware of blocking dependencies for
+ // uncorrelated scalar queries.
+ if (!checkAllBlockingDependenciesMet(index)) {
+ return false;
+ }
+ const size_t num_pending_workorder_protos_before =
+ normal_workorder_protos_container_->getNumWorkOrderProtos(index);
+ const bool done_generation =
+ query_dag_->getNodePayloadMutable(index)
+ ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
+ if (done_generation) {
+ query_exec_state_->setDoneGenerationWorkOrders(index);
+ }
+
+ // TODO(shoban): It would be a good check to see if operator is making
+ // useful progress, i.e., the operator either generates work orders to
+ // execute or still has pending work orders executing. However, this will not
+ // work if Foreman polls operators without feeding data. This check can be
+ // enabled, if Foreman is refactored to call getAllWorkOrders() only when
+ // pending work orders are completed or new input blocks feed.
+
+ generated_new_workorder_protos =
+ (num_pending_workorder_protos_before <
+ normal_workorder_protos_container_->getNumWorkOrderProtos(index));
+ }
+ return generated_new_workorder_protos;
+}
+
+bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
+ DCHECK(checkRebuildRequired(index));
+ DCHECK(!checkRebuildInitiated(index));
+
+ const RelationalOperator &op = query_dag_->getNodePayload(index);
+ DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
+
+ serialization::InitiateRebuildMessage proto;
+ proto.set_operator_index(index);
+ proto.set_insert_destination_index(op.getInsertDestinationID());
+ proto.set_relation_id(op.getOutputRelationID());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage tagged_msg(static_cast<const void *>(proto_bytes),
+ proto_length,
+ kInitiateRebuildMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+ << "') to Shiftboss";
+ // TODO(zuyu): Multiple workers support.
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftbosses_->getClientId(0),
+ move(tagged_msg));
+
+ // The negative value indicates that the number of rebuild work orders is to be
+ // determined.
+ query_exec_state_->setRebuildStatus(index, -1, true);
+
+ // Wait for Shiftbosses to report the number of rebuild work orders.
+ return false;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
new file mode 100644
index 0000000..8641c22
--- /dev/null
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+
+#include <memory>
+
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class QueryHandle;
+class ShiftbossDirectory;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class that manages the execution of a query including generation
+ * of new work orders, keeping track of the query exection state.
+ **/
+class QueryManagerDistributed final : public QueryManagerBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_handle The QueryHandle object for this query.
+ * @param shiftbosses The ShiftbossDirectory to use.
+ * @param foreman_client_id The TMB client ID of the foreman thread.
+ * @param bus The TMB used for communication.
+ **/
+ QueryManagerDistributed(QueryHandle *query_handle,
+ ShiftbossDirectory *shiftbosses,
+ const tmb::client_id foreman_client_id,
+ tmb::MessageBus *bus);
+
+ ~QueryManagerDistributed() override {}
+
+ bool fetchNormalWorkOrders(const dag_node_index index) override;
+
+ /**
+ * @brief Get the next normal workorder to be excuted, wrapped in a
+ * WorkOrderMessage proto.
+ *
+ * @param start_operator_index Begin the search for the schedulable WorkOrder
+ * with the operator at this index.
+ *
+ * @return A pointer to the WorkOrderMessage proto. If there is no WorkOrder
+ * to be executed, return NULL.
+ **/
+ serialization::WorkOrderMessage* getNextWorkOrderMessage(
+ const dag_node_index start_operator_index);
+
+ private:
+ bool checkNormalExecutionOver(const dag_node_index index) const override {
+ return (checkAllDependenciesMet(index) &&
+ !normal_workorder_protos_container_->hasWorkOrderProto(index) &&
+ query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
+ query_exec_state_->hasDoneGenerationWorkOrders(index));
+ }
+
+ bool initiateRebuild(const dag_node_index index) override;
+
+ bool checkRebuildOver(const dag_node_index index) const override {
+ return query_exec_state_->hasRebuildInitiated(index) &&
+ (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+ }
+
+ ShiftbossDirectory *shiftbosses_;
+
+ const tmb::client_id foreman_client_id_;
+ tmb::MessageBus *bus_;
+
+ std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
+
+ DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/ShiftbossDirectory.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ShiftbossDirectory.hpp b/query_execution/ShiftbossDirectory.hpp
new file mode 100644
index 0000000..abb5f32
--- /dev/null
+++ b/query_execution/ShiftbossDirectory.hpp
@@ -0,0 +1,181 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
+
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class which keeps the metadata about the shiftbosses.
+ *
+ * @note This class is intended to be used only by ForemanDistributed thread.
+ * Therefore, none of the methods in this class are thread-safe.
+ **/
+class ShiftbossDirectory {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ ShiftbossDirectory() = default;
+
+ /**
+ * @brief Add the Shiftboss.
+ *
+ * @param shiftboss_id the TMB client ID of Shiftboss thread.
+ * @param capacity The Work Order processing capacity of Shiftboss.
+ **/
+ void addShiftboss(const tmb::client_id shiftboss_id,
+ const std::size_t capacity) {
+ client_id_indices_.emplace(shiftboss_id, client_ids_.size());
+ client_ids_.push_back(shiftboss_id);
+ work_order_capacities_.push_back(capacity);
+ num_queued_work_orders_.push_back(0u);
+ }
+
+ /**
+ * @brief Whether the ShiftbossDirectory has any Shiftboss.
+ *
+ * @return True if no Shiftboss in ShiftbossDirectory. Otherwise false.
+ **/
+ bool empty() const {
+ DCHECK_EQ(client_ids_.empty(), client_id_indices_.empty());
+ DCHECK_EQ(client_ids_.empty(), work_order_capacities_.empty());
+ DCHECK_EQ(client_ids_.empty(), num_queued_work_orders_.empty());
+
+ return client_ids_.empty();
+ }
+
+ /**
+ * @brief Get the number of Shiftboss in ShiftbossDirectory.
+ *
+ * @return The number of Shiftboss in ShiftbossDirectory.
+ **/
+ std::size_t size() const {
+ DCHECK_EQ(client_ids_.size(), client_id_indices_.size());
+ DCHECK_EQ(client_ids_.size(), work_order_capacities_.size());
+ DCHECK_EQ(client_ids_.size(), num_queued_work_orders_.size());
+
+ return client_ids_.size();
+ }
+
+ /**
+ * @brief Get the TMB client ID of the specified Shiftboss.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ *
+ * @return The TMB client ID of the given Shiftboss.
+ **/
+ tmb::client_id getClientId(const std::size_t shiftboss_index) const {
+ DCHECK_LT(shiftboss_index, size());
+ return client_ids_[shiftboss_index];
+ }
+
+ /**
+ * @brief Get the Shiftboss index from the specified client id.
+ *
+ * @param shiftboss_id The TMB client ID of Shiftboss.
+ *
+ * @return The index of the given Shiftboss.
+ **/
+ std::size_t getShiftbossIndex(const tmb::client_id shiftboss_id) const {
+ const auto it = client_id_indices_.find(shiftboss_id);
+ DCHECK(it != client_id_indices_.end());
+
+ return it->second;
+ }
+
+ /**
+ * @brief Whether the given Shiftboss has reached its capacity.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ *
+ * @return True if reached the capacity. Otherwise false.
+ **/
+ bool hasReachedCapacity(const std::size_t shiftboss_index) const {
+ DCHECK_LT(shiftboss_index, size());
+ return num_queued_work_orders_[shiftboss_index] >= work_order_capacities_[shiftboss_index];
+ }
+
+ /**
+ * @brief Add the number of new work orders for the given Shiftboss.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ * @param num_new_work_orders The number of the new work orders will be
+ * executed on Shiftboss indexed by 'shiftboss_index'.
+ **/
+ void addNumQueuedWorkOrders(const std::size_t shiftboss_index,
+ const std::size_t num_new_work_orders) {
+ num_queued_work_orders_[shiftboss_index] += num_new_work_orders;
+ }
+
+ /**
+ * @brief Increase the number of queued workorders for the given Shiftboss by 1.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ **/
+ void incrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
+ DCHECK_LT(shiftboss_index, size());
+ ++num_queued_work_orders_[shiftboss_index];
+ }
+
+ /**
+ * @brief Decrease the number of queued workorders for the given Shiftboss by 1.
+ *
+ * @param shiftboss_index The index of Shiftboss.
+ **/
+ void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
+ DCHECK_LT(shiftboss_index, size());
+ DCHECK_GE(num_queued_work_orders_[shiftboss_index], 1u);
+ --num_queued_work_orders_[shiftboss_index];
+ }
+
+ private:
+ // The TMB client IDs of Shiftbosses.
+ // TODO(zuyu): Support deletions, as Shiftbosses go down.
+ std::vector<tmb::client_id> client_ids_;
+
+ // The map from the TMB client ID of Shiftboss to its index in 'client_ids_'.
+ std::unordered_map<tmb::client_id, std::size_t> client_id_indices_;
+
+ // The max number of WorkOrders per Shiftboss.
+ std::vector<std::size_t> work_order_capacities_;
+
+ // The number of WorkOrders queued for execution per Shiftboss, and the value
+ // should be not greater than that of work_order_capacities_.
+ std::vector<std::size_t> num_queued_work_orders_;
+
+ DISALLOW_COPY_AND_ASSIGN(ShiftbossDirectory);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_