You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:41 UTC

[17/30] incubator-quickstep git commit: Introduced Shiftboss for the distributed version.

Introduced Shiftboss for the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7415ee87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7415ee87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7415ee87

Branch: refs/heads/quickstep-28-29
Commit: 7415ee87fa088054e09b2a9efb88389c43351b12
Parents: 9f9e3b7
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 22 13:29:03 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Wed Jul 27 15:06:13 2016 -0700

----------------------------------------------------------------------
 catalog/CatalogDatabaseCache.hpp             |   5 +
 query_execution/CMakeLists.txt               |  24 ++
 query_execution/QueryExecutionMessages.proto |  30 ++
 query_execution/QueryExecutionTypedefs.hpp   |   8 +
 query_execution/Shiftboss.cpp                | 360 ++++++++++++++++++++++
 query_execution/Shiftboss.hpp                | 241 +++++++++++++++
 storage/StorageManager.hpp                   |   1 +
 7 files changed, 669 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/catalog/CatalogDatabaseCache.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogDatabaseCache.hpp b/catalog/CatalogDatabaseCache.hpp
index 77afe2a..b3e73a6 100644
--- a/catalog/CatalogDatabaseCache.hpp
+++ b/catalog/CatalogDatabaseCache.hpp
@@ -54,6 +54,11 @@ namespace serialization { class CatalogDatabase; }
 class CatalogDatabaseCache : public CatalogDatabaseLite {
  public:
   /**
+   * @brief Constructor.
+   **/
+  CatalogDatabaseCache() {}
+
+  /**
    * @brief Constructor. Reconstruct a database cache from its serialized
    *        Protocol Buffer form.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index f582ba5..8bf1ab1 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -52,6 +52,7 @@ if (ENABLE_DISTRIBUTED)
 endif()
 add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
 if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp)
   add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
 endif()
 add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
@@ -157,6 +158,8 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto
                       quickstep_utility_SortConfiguration_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_catalog_Catalog_proto
+                      quickstep_queryexecution_QueryContext_proto
                       quickstep_relationaloperators_WorkOrder_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_queryexecution_QueryExecutionState
@@ -214,6 +217,26 @@ target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_utility_Macros
                       tmb)
 if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_Shiftboss
+                        glog
+                        quickstep_catalog_CatalogDatabaseCache
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_queryexecution_QueryContext
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_WorkerDirectory
+                        quickstep_queryexecution_WorkerMessage
+                        quickstep_relationaloperators_RebuildWorkOrder
+                        quickstep_relationaloperators_WorkOrderFactory
+                        quickstep_storage_InsertDestination
+                        quickstep_storage_StorageBlock
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageManager
+                        quickstep_threading_Thread
+                        quickstep_threading_ThreadUtil
+                        quickstep_utility_Macros
+                        tmb)
   target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
                         quickstep_utility_Macros
                         tmb)
@@ -272,6 +295,7 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
                         quickstep_queryexecution_BlockLocator
                         quickstep_queryexecution_QueryManagerDistributed
+                        quickstep_queryexecution_Shiftboss
                         quickstep_queryexecution_ShiftbossDirectory)
 endif()
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index fa20993..591ca6c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,6 +16,8 @@ syntax = "proto2";
 
 package quickstep.serialization;
 
+import "catalog/Catalog.proto";
+import "query_execution/QueryContext.proto";
 import "relational_operators/WorkOrder.proto";
 
 // Used for any messages that do not carry payloads.
@@ -73,6 +75,25 @@ message WorkOrdersAvailableMessage {
 }
 
 // Distributed version related messages.
+message ShiftbossRegistrationMessage {
+  // The total Work Order processing capacity in Shiftboss, which equals to the
+  // sum of the capacity of each worker managed by Shiftboss.
+  required uint64 work_order_capacity = 1;
+}
+
+message ShiftbossRegistrationResponseMessage {
+}
+
+message QueryInitiateMessage {
+  required uint64 query_id = 1;
+  required CatalogDatabase catalog_database_cache = 2;
+  required QueryContext query_context = 3;
+}
+
+message QueryInitiateResponseMessage {
+  required uint64 query_id = 1;
+}
+
 message WorkOrderMessage {
   required uint64 query_id = 1;
   required uint64 operator_index = 2;
@@ -92,6 +113,15 @@ message InitiateRebuildResponseMessage {
   required uint64 num_rebuild_work_orders = 3;
 }
 
+message QueryResultRelationMessage {
+  required int32 relation_id = 1;
+  repeated fixed64 blocks = 2 [packed=true];
+}
+
+message QueryResultRelationResponseMessage {
+  required int32 relation_id = 1;
+}
+
 // BlockLocator related messages.
 message BlockDomainRegistrationMessage {
   // Format IP:Port, i.e., "0.0.0.0:0".

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 61e76d7..d73d4ee 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -73,9 +73,17 @@ enum QueryExecutionMessageType : message_type_id {
   kPoisonMessage,  // From the main thread to Foreman and Workers.
 
 #ifdef QUICKSTEP_DISTRIBUTED
+  kShiftbossRegistrationMessage,  // From Shiftboss to Foreman.
+  kShiftbossRegistrationResponseMessage,  // From Foreman to Shiftboss.
+  kQueryInitiateMessage,  // From Foreman to Shiftboss.
+  kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.
+
   kInitiateRebuildMessage,  // From Foreman to Shiftboss.
   kInitiateRebuildResponseMessage,  // From Shiftboss to Foreman.
 
+  kQueryResultRelationMessage,  // From Foreman to Shiftboss.
+  kQueryResultRelationResponseMessage,  // From Shiftboss to Foreman.
+
   // BlockLocator related messages, sorted in a life cycle of StorageManager
   // with a unique block domain.
   kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
new file mode 100644
index 0000000..af56306
--- /dev/null
+++ b/query_execution/Shiftboss.cpp
@@ -0,0 +1,360 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/Shiftboss.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "relational_operators/RebuildWorkOrder.hpp"
+#include "relational_operators/WorkOrderFactory.hpp"
+#include "storage/InsertDestination.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "threading/ThreadUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class WorkOrder;
+
+void Shiftboss::run() {
+  if (cpu_id_ >= 0) {
+    // We can pin the shiftboss thread to a CPU if specified.
+    ThreadUtil::BindToCPU(cpu_id_);
+  }
+
+  for (;;) {
+    // Receive() is a blocking call, causing this thread to sleep until next
+    // message is received.
+    AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+    LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+              << "') received the typed '" << annotated_message.tagged_message.message_type()
+              << "' message from client " << annotated_message.sender;
+    switch (annotated_message.tagged_message.message_type()) {
+      case kShiftbossRegistrationResponseMessage: {
+        foreman_client_id_ = annotated_message.sender;
+        break;
+      }
+      case kQueryInitiateMessage: {
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+        serialization::QueryInitiateMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
+        break;
+      }
+      case kWorkOrderMessage: {
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+        serialization::WorkOrderMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        const std::size_t query_id = proto.query_id();
+        DCHECK_EQ(1u, query_contexts_.count(query_id));
+
+        WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
+                                                                       &database_cache_,
+                                                                       query_contexts_[query_id].get(),
+                                                                       storage_manager_,
+                                                                       shiftboss_client_id_,
+                                                                       bus_);
+
+        unique_ptr<WorkerMessage> worker_message(
+            WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
+
+        TaggedMessage worker_tagged_message(worker_message.get(),
+                                            sizeof(*worker_message),
+                                            kWorkOrderMessage);
+
+        const size_t worker_index = getSchedulableWorker();
+        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                  << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
+                  << "') from Foreman to worker " << worker_index;
+
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           shiftboss_client_id_,
+                                           workers_->getClientID(worker_index),
+                                           move(worker_tagged_message));
+        break;
+      }
+      case kInitiateRebuildMessage: {
+        // Construct rebuild work orders, and send back their number to
+        // 'ForemanDistributed'.
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+        serialization::InitiateRebuildMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processInitiateRebuildMessage(proto.query_id(),
+                                      proto.operator_index(),
+                                      proto.insert_destination_index(),
+                                      proto.relation_id());
+        break;
+      }
+      case kWorkOrderCompleteMessage:  // Fall through.
+      case kRebuildWorkOrderCompleteMessage:
+      case kDataPipelineMessage:
+      case kWorkOrdersAvailableMessage:
+      case kWorkOrderFeedbackMessage: {
+        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                  << "') forwarded typed '" << annotated_message.tagged_message.message_type()
+                  << "' message from worker (client " << annotated_message.sender << ") to Foreman";
+
+        DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           shiftboss_client_id_,
+                                           foreman_client_id_,
+                                           move(annotated_message.tagged_message));
+        break;
+      }
+      case kQueryResultRelationMessage: {
+        // TODO(zuyu): Rename to kSaveQueryResultMessage.
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+        serialization::QueryResultRelationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        for (int i = 0; i < proto.blocks_size(); ++i) {
+          const block_id block = proto.blocks(i);
+          storage_manager_->saveBlockOrBlob(block);
+          if (storage_manager_->blockOrBlobIsLoaded(block)) {
+            // NOTE(zuyu): eviction is required to avoid accesses to the query
+            // result relation schema in CatalogDatabaseCache, for all query
+            // optimizer execution generator unit tests and the single-process
+            // Quickstep CLI.
+            storage_manager_->evictBlockOrBlob(block);
+          }
+        }
+
+        serialization::QueryResultRelationResponseMessage ack_proto;
+        ack_proto.set_relation_id(proto.relation_id());
+
+        const size_t ack_proto_length = ack_proto.ByteSize();
+        char *ack_proto_bytes = static_cast<char*>(malloc(ack_proto_length));
+        CHECK(ack_proto.SerializeToArray(ack_proto_bytes, ack_proto_length));
+
+        TaggedMessage ack_message(static_cast<const void*>(ack_proto_bytes),
+                                  ack_proto_length,
+                                  kQueryResultRelationResponseMessage);
+        free(ack_proto_bytes);
+
+        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                  << "') sent QueryResultRelationResponseMessage (typed '" << kQueryResultRelationResponseMessage
+                  << ") to Foreman";
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           shiftboss_client_id_,
+                                           foreman_client_id_,
+                                           move(ack_message));
+        break;
+      }
+      case kPoisonMessage: {
+        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                  << "') forwarded PoisonMessage (typed '" << kPoisonMessage
+                  << "') from Foreman to all workers";
+
+        tmb::MessageStyle broadcast_style;
+        broadcast_style.Broadcast(true);
+
+        tmb::MessageBus::SendStatus send_status =
+            bus_->Send(shiftboss_client_id_,
+                       worker_addresses_,
+                       broadcast_style,
+                       move(annotated_message.tagged_message));
+        DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+        return;
+      }
+      default: {
+        LOG(FATAL) << "Unknown TMB message type";
+      }
+    }
+  }
+}
+
+size_t Shiftboss::getSchedulableWorker() {
+  const size_t num_workers = workers_->getNumWorkers();
+
+  size_t curr_worker = start_worker_index_;
+  for (;;) {
+    if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) {
+      start_worker_index_ = (curr_worker + 1) % num_workers;
+      // TODO(zuyu): workers_->incrementNumQueuedWorkOrders(curr_worker);
+      // But we need a WorkOrder queue first.
+      return curr_worker;
+    }
+
+    curr_worker = (curr_worker + 1) % num_workers;
+  }
+}
+
+void Shiftboss::registerWithForeman() {
+  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+            << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
+            << "') to all";
+
+  tmb::Address all_addresses;
+  all_addresses.All(true);
+
+  tmb::MessageStyle style;
+
+  serialization::ShiftbossRegistrationMessage proto;
+  proto.set_work_order_capacity(getWorkOrderCapacity());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kShiftbossRegistrationMessage);
+  free(proto_bytes);
+
+  tmb::MessageBus::SendStatus send_status =
+      bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
+  DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+}
+
+void Shiftboss::processQueryInitiateMessage(
+    const std::size_t query_id,
+    const serialization::CatalogDatabase &catalog_database_cache_proto,
+    const serialization::QueryContext &query_context_proto) {
+  database_cache_.update(catalog_database_cache_proto);
+
+  unique_ptr<QueryContext> query_context(
+      new QueryContext(query_context_proto,
+                       database_cache_,
+                       storage_manager_,
+                       shiftboss_client_id_,
+                       bus_));
+  query_contexts_.emplace(query_id, move(query_context));
+
+  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+            << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
+            << "') to Foreman";
+
+  serialization::QueryInitiateResponseMessage proto;
+  proto.set_query_id(query_id);
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
+                            proto_length,
+                            kQueryInitiateResponseMessage);
+  free(proto_bytes);
+
+  QueryExecutionUtil::SendTMBMessage(bus_,
+                                     shiftboss_client_id_,
+                                     foreman_client_id_,
+                                     move(ack_message));
+}
+
+void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
+                                              const std::size_t op_index,
+                                              const QueryContext::insert_destination_id dest_index,
+                                              const relation_id rel_id) {
+  DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
+
+  DCHECK_EQ(1u, query_contexts_.count(query_id));
+  InsertDestination *insert_destination = query_contexts_[query_id]->getInsertDestination(dest_index);
+  DCHECK(insert_destination != nullptr);
+
+  vector<MutableBlockReference> partially_filled_block_refs;
+  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
+
+  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+            << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
+            << "') to Foreman";
+
+  serialization::InitiateRebuildResponseMessage proto;
+  proto.set_query_id(query_id);
+  proto.set_operator_index(op_index);
+  proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
+                            proto_length,
+                            kInitiateRebuildResponseMessage);
+  free(proto_bytes);
+
+  QueryExecutionUtil::SendTMBMessage(bus_,
+                                     shiftboss_client_id_,
+                                     foreman_client_id_,
+                                     move(ack_message));
+
+  for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
+    // NOTE(zuyu): Worker releases the memory after the execution of
+    // RebuildWorkOrder on the Worker.
+    WorkOrder *rebuild_work_order =
+        new RebuildWorkOrder(query_id,
+                             move(partially_filled_block_refs[i]),
+                             op_index,
+                             rel_id,
+                             shiftboss_client_id_,
+                             bus_);
+
+    unique_ptr<WorkerMessage> worker_message(
+        WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index));
+
+    TaggedMessage worker_tagged_message(worker_message.get(),
+                                        sizeof(*worker_message),
+                                        kRebuildWorkOrderMessage);
+
+    const size_t worker_index = getSchedulableWorker();
+    LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+              << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
+              << "') to worker " << worker_index;
+
+    QueryExecutionUtil::SendTMBMessage(bus_,
+                                       shiftboss_client_id_,
+                                       workers_->getClientID(worker_index),
+                                       move(worker_tagged_message));
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
new file mode 100644
index 0000000..096ab74
--- /dev/null
+++ b/query_execution/Shiftboss.hpp
@@ -0,0 +1,241 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <unordered_map>
+
+#include "catalog/CatalogDatabaseCache.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+class StorageManager;
+
+namespace serialization {
+class CatalogDatabase;
+class QueryContext;
+}  // namespace serialization
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief The Shiftboss accepts workorder protos from shiftboss, and assigns
+ *        the workorders to workers.
+ **/
+class Shiftboss : public Thread {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param bus A pointer to the TMB.
+   * @param storage_manager The StorageManager to use.
+   * @param workers A pointer to the WorkerDirectory.
+   * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
+   *
+   * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
+   *       around on different CPUs by the OS.
+  **/
+  Shiftboss(tmb::MessageBus *bus,
+            StorageManager *storage_manager,
+            WorkerDirectory *workers,
+            const int cpu_id = -1)
+      : bus_(DCHECK_NOTNULL(bus)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        workers_(DCHECK_NOTNULL(workers)),
+        cpu_id_(cpu_id),
+        shiftboss_client_id_(tmb::kClientIdNone),
+        foreman_client_id_(tmb::kClientIdNone),
+        max_msgs_per_worker_(1),
+        start_worker_index_(0u) {
+    // Check to have at least one Worker.
+    DCHECK_GT(workers->getNumWorkers(), 0u);
+
+    shiftboss_client_id_ = bus_->Connect();
+    LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
+    DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
+
+    // Messages between Foreman and Shiftboss.
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage);
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
+
+    // Message sent to Worker.
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
+
+    // Message sent to Foreman.
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrdersAvailableMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+
+    // Forward the following message types from Foreman to Workers.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
+
+    // Forward the following message types from Workers to Foreman.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryResultRelationMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryResultRelationResponseMessage);
+
+    // Stop itself.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
+    // Stop all workers.
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage);
+
+    for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
+      worker_addresses_.AddRecipient(workers_->getClientID(i));
+    }
+
+    registerWithForeman();
+  }
+
+  ~Shiftboss() override {
+  }
+
+  /**
+   * @brief Get the TMB client ID of Shiftboss thread.
+   *
+   * @return TMB client ID of shiftboss thread.
+   **/
+  inline tmb::client_id getBusClientID() const {
+    return shiftboss_client_id_;
+  }
+
+  /**
+   * @brief Get the Work Order processing capacity of all Workers managed by
+   *        Shiftboss during a single round of WorkOrder dispatch.
+   **/
+  inline std::size_t getWorkOrderCapacity() const {
+    DCHECK_NE(max_msgs_per_worker_, 0u);
+    return max_msgs_per_worker_ * workers_->getNumWorkers();
+  }
+
+  /**
+   * @brief Get the Worker to assign WorkOrders for execution. Block to wait if
+   *        all Workers have reached their capacity for queued WorkOrders.
+   **/
+  // TODO(zuyu): To achieve non-blocking, we need a queue to cache received
+  // normal Work Order protos from Foreman and the generated rebuild Work Orders.
+  inline std::size_t getSchedulableWorker();
+
+  /**
+   * @brief Set the maximum number of messages that should be allocated to each
+   *        worker during a single round of WorkOrder dispatch.
+   *
+   * @param max_msgs_per_worker Maximum number of messages.
+   **/
+  inline void setMaxMessagesPerWorker(const std::size_t max_msgs_per_worker) {
+    max_msgs_per_worker_ = max_msgs_per_worker;
+  }
+
+ protected:
+  /**
+   * @brief The shiftboss receives workorders, and based on the response it
+   *        assigns workorders to workers.
+   *
+   * @note  The workers who get the messages from the Shiftboss execute and
+   *        subsequently delete the WorkOrder contained in the message.
+   **/
+  void run() override;
+
+ private:
+  void registerWithForeman();
+
+  /**
+   * @brief Process the Shiftboss initiate message and ack back.
+   *
+   * @param query_id The given query id.
+   * @param catalog_database_cache_proto The proto used to update
+   *        CatalogDatabaseCache.
+   * @param query_context_proto The QueryContext proto.
+   **/
+  void processQueryInitiateMessage(const std::size_t query_id,
+                                   const serialization::CatalogDatabase &catalog_database_cache_proto,
+                                   const serialization::QueryContext &query_context_proto);
+
+  /**
+   * @brief Process the RebuildWorkOrder initiate message and ack back.
+   *
+   * @param query_id The ID of the query to which this RebuildWorkOrder initiate
+   *        message belongs.
+   * @param op_index The index of the operator for rebuild work orders.
+   * @param dest_index The InsertDestination index in QueryContext to rebuild.
+   * @param rel_id The relation that needs to generate rebuild work orders.
+   **/
+  void processInitiateRebuildMessage(const std::size_t query_id,
+                                     const std::size_t op_index,
+                                     const QueryContext::insert_destination_id dest_index,
+                                     const relation_id rel_id);
+
+  // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss,
+  // and Shiftboss and Worker thread pool.
+  tmb::MessageBus *bus_;
+
+  CatalogDatabaseCache database_cache_;
+  StorageManager *storage_manager_;
+  WorkerDirectory *workers_;
+
+  // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
+  const int cpu_id_;
+
+  tmb::client_id shiftboss_client_id_, foreman_client_id_;
+
+  // TMB recipients for all workers managed by this Shiftboss.
+  tmb::Address worker_addresses_;
+
+  // During a single round of WorkOrder dispatch, a Worker should be allocated
+  // at most these many WorkOrders.
+  std::size_t max_msgs_per_worker_;
+
+  // The worker index for scheduling Work Order.
+  std::size_t start_worker_index_;
+
+  // QueryContexts per query.
+  std::unordered_map<std::size_t, std::unique_ptr<QueryContext>> query_contexts_;
+
+  DISALLOW_COPY_AND_ASSIGN(Shiftboss);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 50ddb0f..348018f 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -619,6 +619,7 @@ class StorageManager {
   FRIEND_TEST(BlockLocatorTest, BlockTest);
   FRIEND_TEST(BlockLocatorTest, BlobTest);
 
+  friend class Shiftboss;
   FRIEND_TEST(StorageManagerTest, DifferentNUMANodeBlobTestWithEviction);
   FRIEND_TEST(StorageManagerTest, EvictFromSameShardTest);