You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:36 UTC

[12/30] incubator-quickstep git commit: Added QueryManagerDistributed.

Added QueryManagerDistributed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cdecf0b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cdecf0b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cdecf0b5

Branch: refs/heads/quickstep-28-29
Commit: cdecf0b561d0cf1c80516d51401bd098a6263659
Parents: db0e7e3
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Jul 12 22:04:54 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Jul 19 11:46:42 2016 -0700

----------------------------------------------------------------------
 CMakeLists.txt                               |  10 ++
 query_execution/CMakeLists.txt               |  34 +++-
 query_execution/QueryExecutionMessages.proto |  34 +++-
 query_execution/QueryExecutionTypedefs.hpp   |   3 +
 query_execution/QueryManagerDistributed.cpp  | 159 +++++++++++++++++++
 query_execution/QueryManagerDistributed.hpp  | 105 +++++++++++++
 query_execution/ShiftbossDirectory.hpp       | 181 ++++++++++++++++++++++
 7 files changed, 518 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0e5d3de..0bbde61 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -281,6 +281,16 @@ else()
     set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror")
   endif()
 
+  if (ENABLE_DISTRIBUTED)
+    # Clang reports such warning when using Protoc 3.0 beta.
+    if(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
+      CHECK_CXX_COMPILER_FLAG("-Wno-extended-offsetof" COMPILER_HAS_WNO_EXTENDED_OFFSETOF)
+      if (COMPILER_HAS_WNO_EXTENDED_OFFSETOF)
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-extended-offsetof")
+      endif()
+    endif()
+  endif()
+
   # One of the protobuf headers includes a nested anonymous union within
   # another anonymous type. Ordinarily we work around this by compiling the
   # protobuf libraries themselves with "-Wno-nested-anon-types" and including

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 028531d..8c12a5d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -28,10 +28,10 @@ else()
 endif()
 
 # Declare micro-libs:
+add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
 endif()
-add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
@@ -46,7 +46,13 @@ add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryE
 add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp)
 add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp)
 add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
+endif()
 add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
+endif()
 add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
 add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
 add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -143,6 +149,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto
                       quickstep_utility_SortConfiguration_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_relationaloperators_WorkOrder_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_queryexecution_QueryExecutionState
                       glog
@@ -167,6 +174,22 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_DAG
                       quickstep_utility_Macros)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
+                        quickstep_queryexecution_QueryContext
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionState
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_QueryManagerBase
+                        quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_queryexecution_WorkOrderProtosContainer
+                        quickstep_relationaloperators_RelationalOperator
+                        quickstep_relationaloperators_WorkOrder_proto
+                        quickstep_utility_DAG
+                        quickstep_utility_Macros
+                        tmb)
+endif()
 target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryContext
@@ -182,6 +205,11 @@ target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_utility_DAG
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_utility_Macros
+                        tmb)
+endif()
 target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
                       glog
                       quickstep_relationaloperators_WorkOrder_proto
@@ -233,7 +261,9 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_WorkerSelectionPolicy)
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
-                        quickstep_queryexecution_BlockLocator)
+                        quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_QueryManagerDistributed
+                        quickstep_queryexecution_ShiftbossDirectory)
 endif()
 
 # Tests:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 65a8946..fa20993 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,22 +16,24 @@ syntax = "proto2";
 
 package quickstep.serialization;
 
+import "relational_operators/WorkOrder.proto";
+
 // Used for any messages that do not carry payloads.
 message EmptyMessage {
 }
 
 // Note: There are different types of completion messages for normal work orders
 // rebuild work orders. This can be potentially helpful when we want to collect
-// different statistics for executing different types of work orders. 
-// e.g. In select normal work order completion message, we could be interested 
-// in the selectivity of the block whose work order got execute. In rebuild work 
-// order completion message, we may be interested in adding the compression 
+// different statistics for executing different types of work orders.
+// e.g. In select normal work order completion message, we could be interested
+// in the selectivity of the block whose work order got execute. In rebuild work
+// order completion message, we may be interested in adding the compression
 // ratio or dictionary size of the rebuilt block.
 
-// TODO(harshad) : If there are different fields in the two message types below, 
+// TODO(harshad) : If there are different fields in the two message types below,
 // create a base message class called WorkOrderCompletionMessage and make the
 // two classes below extend the base class. All the common fields in both the
-// classes can be moved to the base class. 
+// classes can be moved to the base class.
 
 // A message sent upon completion of a normal (not rebuild) WorkOrder execution.
 message NormalWorkOrderCompletionMessage {
@@ -70,6 +72,26 @@ message WorkOrdersAvailableMessage {
   required uint64 query_id = 2;
 }
 
+// Distributed version related messages.
+message WorkOrderMessage {
+  required uint64 query_id = 1;
+  required uint64 operator_index = 2;
+  required WorkOrder work_order = 3;
+}
+
+message InitiateRebuildMessage {
+  required uint64 query_id = 1;
+  required uint64 operator_index = 2;
+  required uint64 insert_destination_index = 3;
+  required  int32 relation_id = 4;
+}
+
+message InitiateRebuildResponseMessage {
+  required uint64 query_id = 1;
+  required uint64 operator_index = 2;
+  required uint64 num_rebuild_work_orders = 3;
+}
+
 // BlockLocator related messages.
 message BlockDomainRegistrationMessage {
   // Format IP:Port, i.e., "0.0.0.0:0".

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..61e76d7 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -73,6 +73,9 @@ enum QueryExecutionMessageType : message_type_id {
   kPoisonMessage,  // From the main thread to Foreman and Workers.
 
 #ifdef QUICKSTEP_DISTRIBUTED
+  kInitiateRebuildMessage,  // From Foreman to Shiftboss.
+  kInitiateRebuildResponseMessage,  // From Shiftboss to Foreman.
+
   // BlockLocator related messages, sorted in a life cycle of StorageManager
   // with a unique block domain.
   kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
new file mode 100644
index 0000000..e906fa5
--- /dev/null
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -0,0 +1,159 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/QueryManagerDistributed.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "utility/DAG.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+
+namespace quickstep {
+
+QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
+                                                 ShiftbossDirectory *shiftbosses,
+                                                 const tmb::client_id foreman_client_id,
+                                                 tmb::MessageBus *bus)
+    : QueryManagerBase(query_handle),
+      shiftbosses_(shiftbosses),
+      foreman_client_id_(foreman_client_id),
+      bus_(bus),
+      normal_workorder_protos_container_(
+          new WorkOrderProtosContainer(num_operators_in_dag_)) {
+  // Collect all the workorders from all the relational operators in the DAG.
+  for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
+    if (checkAllBlockingDependenciesMet(index)) {
+      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
+      processOperator(index, false);
+    }
+  }
+}
+
+serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage(
+    const dag_node_index start_operator_index) {
+  // Default policy: Operator with lowest index first.
+  size_t num_operators_checked = 0;
+  for (dag_node_index index = start_operator_index;
+       num_operators_checked < num_operators_in_dag_;
+       index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
+    if (query_exec_state_->hasExecutionFinished(index)) {
+      continue;
+    }
+    unique_ptr<serialization::WorkOrder> work_order_proto(
+        normal_workorder_protos_container_->getWorkOrderProto(index));
+    if (work_order_proto != nullptr) {
+      query_exec_state_->incrementNumQueuedWorkOrders(index);
+
+      unique_ptr<serialization::WorkOrderMessage> message_proto(new serialization::WorkOrderMessage);
+      message_proto->set_query_id(query_id_);
+      message_proto->set_operator_index(index);
+      message_proto->mutable_work_order()->MergeFrom(*work_order_proto);
+
+      return message_proto.release();
+    }
+  }
+  // No normal WorkOrder protos available right now.
+  return nullptr;
+}
+
+bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) {
+  bool generated_new_workorder_protos = false;
+  if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
+    // Do not fetch any work units until all blocking dependencies are met.
+    // The releational operator is not aware of blocking dependencies for
+    // uncorrelated scalar queries.
+    if (!checkAllBlockingDependenciesMet(index)) {
+      return false;
+    }
+    const size_t num_pending_workorder_protos_before =
+        normal_workorder_protos_container_->getNumWorkOrderProtos(index);
+    const bool done_generation =
+        query_dag_->getNodePayloadMutable(index)
+            ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
+    if (done_generation) {
+      query_exec_state_->setDoneGenerationWorkOrders(index);
+    }
+
+    // TODO(shoban): It would be a good check to see if operator is making
+    // useful progress, i.e., the operator either generates work orders to
+    // execute or still has pending work orders executing. However, this will not
+    // work if Foreman polls operators without feeding data. This check can be
+    // enabled, if Foreman is refactored to call getAllWorkOrders() only when
+    // pending work orders are completed or new input blocks feed.
+
+    generated_new_workorder_protos =
+        (num_pending_workorder_protos_before <
+         normal_workorder_protos_container_->getNumWorkOrderProtos(index));
+  }
+  return generated_new_workorder_protos;
+}
+
+bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
+  DCHECK(checkRebuildRequired(index));
+  DCHECK(!checkRebuildInitiated(index));
+
+  const RelationalOperator &op = query_dag_->getNodePayload(index);
+  DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
+
+  serialization::InitiateRebuildMessage proto;
+  proto.set_operator_index(index);
+  proto.set_insert_destination_index(op.getInsertDestinationID());
+  proto.set_relation_id(op.getOutputRelationID());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage tagged_msg(static_cast<const void *>(proto_bytes),
+                           proto_length,
+                           kInitiateRebuildMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+            << "') to Shiftboss";
+  // TODO(zuyu): Multiple workers support.
+  QueryExecutionUtil::SendTMBMessage(bus_,
+                                     foreman_client_id_,
+                                     shiftbosses_->getClientId(0),
+                                     move(tagged_msg));
+
+  // The negative value indicates that the number of rebuild work orders is to be
+  // determined.
+  query_exec_state_->setRebuildStatus(index, -1, true);
+
+  // Wait for Shiftbosses to report the number of rebuild work orders.
+  return false;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
new file mode 100644
index 0000000..8641c22
--- /dev/null
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -0,0 +1,105 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+
+#include <memory>
+
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class QueryHandle;
+class ShiftbossDirectory;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class that manages the execution of a query including generation
+ *        of new work orders, keeping track of the query exection state.
+ **/
+class QueryManagerDistributed final : public QueryManagerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_handle The QueryHandle object for this query.
+   * @param shiftbosses The ShiftbossDirectory to use.
+   * @param foreman_client_id The TMB client ID of the foreman thread.
+   * @param bus The TMB used for communication.
+   **/
+  QueryManagerDistributed(QueryHandle *query_handle,
+                          ShiftbossDirectory *shiftbosses,
+                          const tmb::client_id foreman_client_id,
+                          tmb::MessageBus *bus);
+
+  ~QueryManagerDistributed() override {}
+
+  bool fetchNormalWorkOrders(const dag_node_index index) override;
+
+ /**
+   * @brief Get the next normal workorder to be excuted, wrapped in a
+   *        WorkOrderMessage proto.
+   *
+   * @param start_operator_index Begin the search for the schedulable WorkOrder
+   *        with the operator at this index.
+   *
+   * @return A pointer to the WorkOrderMessage proto. If there is no WorkOrder
+   *         to be executed, return NULL.
+   **/
+  serialization::WorkOrderMessage* getNextWorkOrderMessage(
+      const dag_node_index start_operator_index);
+
+ private:
+  bool checkNormalExecutionOver(const dag_node_index index) const override {
+    return (checkAllDependenciesMet(index) &&
+            !normal_workorder_protos_container_->hasWorkOrderProto(index) &&
+            query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
+            query_exec_state_->hasDoneGenerationWorkOrders(index));
+  }
+
+  bool initiateRebuild(const dag_node_index index) override;
+
+  bool checkRebuildOver(const dag_node_index index) const override {
+    return query_exec_state_->hasRebuildInitiated(index) &&
+           (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+  }
+
+  ShiftbossDirectory *shiftbosses_;
+
+  const tmb::client_id foreman_client_id_;
+  tmb::MessageBus *bus_;
+
+  std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
+
+  DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cdecf0b5/query_execution/ShiftbossDirectory.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ShiftbossDirectory.hpp b/query_execution/ShiftbossDirectory.hpp
new file mode 100644
index 0000000..abb5f32
--- /dev/null
+++ b/query_execution/ShiftbossDirectory.hpp
@@ -0,0 +1,181 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
+
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class which keeps the metadata about the shiftbosses.
+ *
+ * @note This class is intended to be used only by ForemanDistributed thread.
+ *       Therefore, none of the methods in this class are thread-safe.
+ **/
+class ShiftbossDirectory {
+ public:
+  /**
+   * @brief Constructor.
+  **/
+  ShiftbossDirectory() = default;
+
+  /**
+   * @brief Add the Shiftboss.
+   *
+   * @param shiftboss_id the TMB client ID of Shiftboss thread.
+   * @param capacity The Work Order processing capacity of Shiftboss.
+   **/
+  void addShiftboss(const tmb::client_id shiftboss_id,
+                    const std::size_t capacity) {
+    client_id_indices_.emplace(shiftboss_id, client_ids_.size());
+    client_ids_.push_back(shiftboss_id);
+    work_order_capacities_.push_back(capacity);
+    num_queued_work_orders_.push_back(0u);
+  }
+
+  /**
+   * @brief Whether the ShiftbossDirectory has any Shiftboss.
+   *
+   * @return True if no Shiftboss in ShiftbossDirectory. Otherwise false.
+   **/
+  bool empty() const {
+    DCHECK_EQ(client_ids_.empty(), client_id_indices_.empty());
+    DCHECK_EQ(client_ids_.empty(), work_order_capacities_.empty());
+    DCHECK_EQ(client_ids_.empty(), num_queued_work_orders_.empty());
+
+    return client_ids_.empty();
+  }
+
+  /**
+   * @brief Get the number of Shiftboss in ShiftbossDirectory.
+   *
+   * @return The number of Shiftboss in ShiftbossDirectory.
+   **/
+  std::size_t size() const {
+    DCHECK_EQ(client_ids_.size(), client_id_indices_.size());
+    DCHECK_EQ(client_ids_.size(), work_order_capacities_.size());
+    DCHECK_EQ(client_ids_.size(), num_queued_work_orders_.size());
+
+    return client_ids_.size();
+  }
+
+  /**
+   * @brief Get the TMB client ID of the specified Shiftboss.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   *
+   * @return The TMB client ID of the given Shiftboss.
+   **/
+  tmb::client_id getClientId(const std::size_t shiftboss_index) const {
+    DCHECK_LT(shiftboss_index, size());
+    return client_ids_[shiftboss_index];
+  }
+
+  /**
+   * @brief Get the Shiftboss index from the specified client id.
+   *
+   * @param shiftboss_id The TMB client ID of Shiftboss.
+   *
+   * @return The index of the given Shiftboss.
+   **/
+  std::size_t getShiftbossIndex(const tmb::client_id shiftboss_id) const {
+    const auto it = client_id_indices_.find(shiftboss_id);
+    DCHECK(it != client_id_indices_.end());
+
+    return it->second;
+  }
+
+  /**
+   * @brief Whether the given Shiftboss has reached its capacity.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   *
+   * @return True if reached the capacity. Otherwise false.
+   **/
+  bool hasReachedCapacity(const std::size_t shiftboss_index) const {
+    DCHECK_LT(shiftboss_index, size());
+    return num_queued_work_orders_[shiftboss_index] >= work_order_capacities_[shiftboss_index];
+  }
+
+  /**
+   * @brief Add the number of new work orders for the given Shiftboss.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   * @param num_new_work_orders The number of the new work orders will be
+   *        executed on Shiftboss indexed by 'shiftboss_index'.
+   **/
+  void addNumQueuedWorkOrders(const std::size_t shiftboss_index,
+                              const std::size_t num_new_work_orders) {
+    num_queued_work_orders_[shiftboss_index] += num_new_work_orders;
+  }
+
+  /**
+   * @brief Increase the number of queued workorders for the given Shiftboss by 1.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   **/
+  void incrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
+    DCHECK_LT(shiftboss_index, size());
+    ++num_queued_work_orders_[shiftboss_index];
+  }
+
+  /**
+   * @brief Decrease the number of queued workorders for the given Shiftboss by 1.
+   *
+   * @param shiftboss_index The index of Shiftboss.
+   **/
+  void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
+    DCHECK_LT(shiftboss_index, size());
+    DCHECK_GE(num_queued_work_orders_[shiftboss_index], 1u);
+    --num_queued_work_orders_[shiftboss_index];
+  }
+
+ private:
+  // The TMB client IDs of Shiftbosses.
+  // TODO(zuyu): Support deletions, as Shiftbosses go down.
+  std::vector<tmb::client_id> client_ids_;
+
+  // The map from the TMB client ID of Shiftboss to its index in 'client_ids_'.
+  std::unordered_map<tmb::client_id, std::size_t> client_id_indices_;
+
+  // The max number of WorkOrders per Shiftboss.
+  std::vector<std::size_t> work_order_capacities_;
+
+  // The number of WorkOrders queued for execution per Shiftboss, and the value
+  // should be not greater than that of work_order_capacities_.
+  std::vector<std::size_t> num_queued_work_orders_;
+
+  DISALLOW_COPY_AND_ASSIGN(ShiftbossDirectory);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_