You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sp...@apache.org on 2016/12/11 17:45:41 UTC

[18/51] [abbrv] [partial] incubator-quickstep git commit: remove c++ files

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
deleted file mode 100644
index fb9a9d6..0000000
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
-
-#include <cstddef>
-#include <unordered_map>
-#include <vector>
-
-#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED
-#include "threading/ThreadIDBasedMap.hpp"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_style.h"
-#include "tmb/pure_memory_message_bus.h"
-#include "tmb/tagged_message.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-typedef tmb::Address Address;
-typedef tmb::AnnotatedMessage AnnotatedMessage;
-typedef tmb::MessageBus MessageBus;
-typedef tmb::MessageStyle MessageStyle;
-typedef tmb::Priority Priority;
-typedef tmb::PureMemoryMessageBus<false> MessageBusImpl;
-typedef tmb::TaggedMessage TaggedMessage;
-typedef tmb::client_id client_id;
-typedef tmb::message_type_id message_type_id;
-
-using ClientIDMap = ThreadIDBasedMap<client_id,
-                                     'C',
-                                     'l',
-                                     'i',
-                                     'e',
-                                     'n',
-                                     't',
-                                     'I',
-                                     'D',
-                                     'M',
-                                     'a',
-                                     'p'>;
-
-// We sort the following message types in the order of a life cycle of a query.
-enum QueryExecutionMessageType : message_type_id {
-  kAdmitRequestMessage = 0,  // Requesting a query (or queries) to be admitted, from
-                             // the main thread to Foreman.
-  kWorkOrderMessage,  // From Foreman to Worker.
-  kWorkOrderCompleteMessage,  // From Worker to Foreman.
-  kCatalogRelationNewBlockMessage,  // From InsertDestination to Foreman.
-  kDataPipelineMessage,  // From InsertDestination or some WorkOrders to Foreman.
-  kWorkOrderFeedbackMessage,  // From some WorkOrders to Foreman on behalf of
-                              // their corresponding RelationalOperators.
-  kRebuildWorkOrderMessage,  // From Foreman to Worker.
-  kRebuildWorkOrderCompleteMessage,  // From Worker to Foreman.
-  kWorkloadCompletionMessage,  // From Foreman to main thread.
-  kPoisonMessage,  // From the main thread to Foreman and Workers.
-
-#ifdef QUICKSTEP_DISTRIBUTED
-  kShiftbossRegistrationMessage,  // From Shiftboss to Foreman.
-  kShiftbossRegistrationResponseMessage,  // From Foreman to Shiftboss, or from
-                                          // Shiftboss to Worker.
-  kQueryInitiateMessage,  // From Foreman to Shiftboss.
-  kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.
-
-  kInitiateRebuildMessage,  // From Foreman to Shiftboss.
-  kInitiateRebuildResponseMessage,  // From Shiftboss to Foreman.
-
-  kQueryTeardownMessage,  // From Foreman to Shiftboss.
-
-  kSaveQueryResultMessage,  // From Foreman to Shiftboss.
-  kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
-
-  // From Foreman to CLI.
-  kQueryExecutionSuccessMessage,
-
-  // BlockLocator related messages, sorted in a life cycle of StorageManager
-  // with a unique block domain.
-  kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.
-  kBlockDomainRegistrationResponseMessage,  // From BlockLocator to Worker.
-  kAddBlockLocationMessage,  // From StorageManager to BlockLocator.
-  kDeleteBlockLocationMessage,  // From StorageManager to BlockLocator.
-  kLocateBlockMessage,  // From StorageManager to BlockLocator.
-  kLocateBlockResponseMessage,  // From BlockLocator to StorageManager.
-  kGetPeerDomainNetworkAddressesMessage,  // From StorageManager to BlockLocator.
-  kGetPeerDomainNetworkAddressesResponseMessage,  // From BlockLocator to StorageManager.
-  kBlockDomainUnregistrationMessage,  // From StorageManager to BlockLocator.
-#endif
-};
-
-// WorkOrder profiling data structures.
-// Profiling record for an individual work order.
-struct WorkOrderTimeEntry {
-  std::size_t worker_id;
-  std::size_t operator_id;
-  std::size_t start_time;  // Epoch time measured in microseconds
-  std::size_t end_time;  // Epoch time measured in microseconds
-};
-// Key = query ID.
-// Value = vector of work order profiling records.
-typedef std::unordered_map<std::size_t, std::vector<WorkOrderTimeEntry>> WorkOrderTimeRecorder;
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
deleted file mode 100644
index b41965c..0000000
--- a/query_execution/QueryExecutionUtil.hpp
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <utility>
-
-#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/message_style.h"
-#include "tmb/tagged_message.h"
-
-namespace quickstep {
-
-class QueryHandle;
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A static class for reusable methods in query_execution module.
- **/
-class QueryExecutionUtil {
- public:
-  /**
-   * @brief Send a TMB message to a single receiver.
-   *
-   * @param bus A pointer to the TMB.
-   * @param sender_id The client ID of the sender.
-   * @param receiver_id The client ID of the receiver.
-   * @param tagged_message A moved-from reference to the tagged message.
-   *
-   * @return A status code indicating the result of the message getting sent.
-   *         The caller should ensure that the status is SendStatus::kOK.
-   **/
-  static tmb::MessageBus::SendStatus SendTMBMessage(
-      tmb::MessageBus *bus,
-      tmb::client_id sender_id,
-      tmb::client_id receiver_id,
-      tmb::TaggedMessage &&tagged_message) {  // NOLINT(whitespace/operators)
-    tmb::Address receiver_address;
-    receiver_address.AddRecipient(receiver_id);
-
-    tmb::MessageStyle single_receiver_style;
-    return bus->Send(sender_id,
-                     receiver_address,
-                     single_receiver_style,
-                     std::move(tagged_message));
-  }
-
-  /**
-   * @brief Construct and send an AdmitRequestMessage from a given sender to a
-   *        given recipient.
-   *
-   * @param sender_id The TMB client ID of the sender.
-   * @param receiver_id The TMB client ID of the receiver.
-   * @param query_handle The QueryHandle used in the AdmitRequestMessage.
-   * @param bus A pointer to the TMB.
-   * @param tagged_message A moved from reference to the tagged message.
-   *
-   * @return A status code indicating the result of the message delivery.
-   *         The caller should ensure that the status is SendStatus::kOK.
-   **/
-  static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
-      const tmb::client_id sender_id,
-      const tmb::client_id receiver_id,
-      QueryHandle *query_handle,
-      tmb::MessageBus *bus) {
-    std::unique_ptr<AdmitRequestMessage> request_message(
-        new AdmitRequestMessage(query_handle));
-    const std::size_t size_of_request_msg = sizeof(*request_message);
-    tmb::TaggedMessage admit_tagged_message(
-        request_message.release(), size_of_request_msg, kAdmitRequestMessage);
-
-    return QueryExecutionUtil::SendTMBMessage(
-        bus, sender_id, receiver_id, std::move(admit_tagged_message));
-  }
-
-  /**
-   * @brief Receive a query completion message.
-   *
-   * @param receiver_id The TMB client ID of the receiver thread.
-   * @param bus A pointer to the TMB.
-   *
-   * @note Right now the query completion message is of no interest to the
-   *       caller. In the future, if this message needs to be fetched, make this
-   *       function return the TaggedMessage.
-   **/
-  static void ReceiveQueryCompletionMessage(const tmb::client_id receiver_id,
-                                            tmb::MessageBus *bus) {
-    const tmb::AnnotatedMessage annotated_msg =
-        bus->Receive(receiver_id, 0, true);
-    const tmb::TaggedMessage &tagged_message = annotated_msg.tagged_message;
-    DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
-  }
-
-  static void BroadcastMessage(const tmb::client_id sender_id,
-                               const tmb::Address &addresses,
-                               tmb::TaggedMessage &&tagged_message,  // NOLINT(whitespace/operators)
-                               tmb::MessageBus *bus) {
-    // The sender broadcasts the given message to all 'addresses'.
-    tmb::MessageStyle style;
-    style.Broadcast(true);
-
-    const tmb::MessageBus::SendStatus send_status =
-        bus->Send(sender_id, addresses, style, std::move(tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-  }
-
-  static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
-    // Terminate all threads.
-    // The sender thread broadcasts poison message to the workers and foreman.
-    // Each worker dies after receiving poison message. The order of workers'
-    // death is irrelavant.
-    tmb::MessageStyle style;
-    style.Broadcast(true);
-    tmb::Address address;
-    address.All(true);
-    tmb::TaggedMessage poison_tagged_message(kPoisonMessage);
-
-    DLOG(INFO) << "TMB client ID " << sender_id
-               << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all";
-    const tmb::MessageBus::SendStatus send_status = bus->Send(
-        sender_id, address, style, std::move(poison_tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-  }
-
- private:
-  /**
-   * @brief Constructor. Made private to avoid instantiation.
-   **/
-  QueryExecutionUtil();
-
-  DISALLOW_COPY_AND_ASSIGN(QueryExecutionUtil);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
deleted file mode 100644
index 8e37da8..0000000
--- a/query_execution/QueryManagerBase.cpp
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/QueryManagerBase.hpp"
-
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "query_optimizer/QueryPlan.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "glog/logging.h"
-
-using std::pair;
-
-namespace quickstep {
-
-QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
-    : query_handle_(DCHECK_NOTNULL(query_handle)),
-      query_id_(query_handle->query_id()),
-      query_dag_(DCHECK_NOTNULL(
-          DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
-      num_operators_in_dag_(query_dag_->size()),
-      output_consumers_(num_operators_in_dag_),
-      blocking_dependencies_(num_operators_in_dag_),
-      query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) {
-  for (dag_node_index node_index = 0;
-       node_index < num_operators_in_dag_;
-       ++node_index) {
-    const QueryContext::insert_destination_id insert_destination_index =
-        query_dag_->getNodePayload(node_index).getInsertDestinationID();
-    if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
-      // Rebuild is necessary whenever InsertDestination is present.
-      query_exec_state_->setRebuildRequired(node_index);
-      query_exec_state_->setRebuildStatus(node_index, 0, false);
-    }
-
-    for (const pair<dag_node_index, bool> &dependent_link :
-         query_dag_->getDependents(node_index)) {
-      const dag_node_index dependent_op_index = dependent_link.first;
-      if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
-        // The link is not a pipeline-breaker. Streaming of blocks is possible
-        // between these two operators.
-        output_consumers_[node_index].push_back(dependent_op_index);
-      } else {
-        // The link is a pipeline-breaker. Streaming of blocks is not possible
-        // between these two operators.
-        blocking_dependencies_[dependent_op_index].push_back(node_index);
-      }
-    }
-  }
-}
-
-QueryManagerBase::QueryStatusCode QueryManagerBase::queryStatus(
-    const dag_node_index op_index) {
-  // As kQueryExecuted takes precedence over kOperatorExecuted, we first check
-  // whether the query has finished its execution.
-  if (query_exec_state_->hasQueryExecutionFinished()) {
-    return QueryStatusCode::kQueryExecuted;
-  }
-
-  if (query_exec_state_->hasExecutionFinished(op_index)) {
-    return QueryStatusCode::kOperatorExecuted;
-  }
-
-  return QueryStatusCode::kNone;
-}
-
-void QueryManagerBase::processFeedbackMessage(
-    const dag_node_index op_index, const WorkOrder::FeedbackMessage &msg) {
-  RelationalOperator *op =
-      query_dag_->getNodePayloadMutable(op_index);
-  op->receiveFeedbackMessage(msg);
-}
-
-void QueryManagerBase::processWorkOrderCompleteMessage(
-    const dag_node_index op_index) {
-  query_exec_state_->decrementNumQueuedWorkOrders(op_index);
-
-  // Check if new work orders are available and fetch them if so.
-  fetchNormalWorkOrders(op_index);
-
-  if (checkRebuildRequired(op_index)) {
-    if (checkNormalExecutionOver(op_index)) {
-      if (!checkRebuildInitiated(op_index)) {
-        if (initiateRebuild(op_index)) {
-          // Rebuild initiated and completed right away.
-          markOperatorFinished(op_index);
-        } else {
-          // Rebuild under progress.
-        }
-      } else if (checkRebuildOver(op_index)) {
-        // Rebuild was under progress and now it is over.
-        markOperatorFinished(op_index);
-      }
-    } else {
-      // Normal execution under progress for this operator.
-    }
-  } else if (checkOperatorExecutionOver(op_index)) {
-    // Rebuild not required for this operator and its normal execution is
-    // complete.
-    markOperatorFinished(op_index);
-  }
-
-  for (const pair<dag_node_index, bool> &dependent_link :
-       query_dag_->getDependents(op_index)) {
-    const dag_node_index dependent_op_index = dependent_link.first;
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      // Process the dependent operator (of the operator whose WorkOrder
-      // was just executed) for which all the dependencies have been met.
-      processOperator(dependent_op_index, true);
-    }
-  }
-}
-
-void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index) {
-  query_exec_state_->decrementNumRebuildWorkOrders(op_index);
-
-  if (checkRebuildOver(op_index)) {
-    markOperatorFinished(op_index);
-
-    for (const pair<dag_node_index, bool> &dependent_link :
-         query_dag_->getDependents(op_index)) {
-      const dag_node_index dependent_op_index = dependent_link.first;
-      if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-        processOperator(dependent_op_index, true);
-      }
-    }
-  }
-}
-
-void QueryManagerBase::processOperator(const dag_node_index index,
-                                       const bool recursively_check_dependents) {
-  if (fetchNormalWorkOrders(index)) {
-    // Fetched work orders. Return to wait for the generated work orders to
-    // execute, and skip the execution-finished checks.
-    return;
-  }
-
-  if (checkNormalExecutionOver(index)) {
-    if (checkRebuildRequired(index)) {
-      if (!checkRebuildInitiated(index)) {
-        // Rebuild hasn't started, initiate it.
-        if (initiateRebuild(index)) {
-          // Rebuild initiated and completed right away.
-          markOperatorFinished(index);
-        } else {
-          // Rebuild WorkOrders have been generated.
-          return;
-        }
-      } else if (checkRebuildOver(index)) {
-        // Rebuild had been initiated and it is over.
-        markOperatorFinished(index);
-      }
-    } else {
-      // Rebuild is not required and normal execution over, mark finished.
-      markOperatorFinished(index);
-    }
-    // If we reach here, that means the operator has been marked as finished.
-    if (recursively_check_dependents) {
-      for (const pair<dag_node_index, bool> &dependent_link :
-           query_dag_->getDependents(index)) {
-        const dag_node_index dependent_op_index = dependent_link.first;
-        if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-          processOperator(dependent_op_index, true);
-        }
-      }
-    }
-  }
-}
-
-void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
-                                                  const block_id block,
-                                                  const relation_id rel_id) {
-  for (const dag_node_index consumer_index :
-       output_consumers_[op_index]) {
-    // Feed the streamed block to the consumer. Note that 'output_consumers_'
-    // only contain those dependents of operator with index = op_index which are
-    // eligible to receive streamed input.
-    query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
-    // Because of the streamed input just fed, check if there are any new
-    // WorkOrders available and if so, fetch them.
-    fetchNormalWorkOrders(consumer_index);
-  }
-}
-
-void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
-  query_exec_state_->setExecutionFinished(index);
-
-  RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
-  op->updateCatalogOnCompletion();
-
-  const relation_id output_rel = op->getOutputRelationID();
-  for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
-    const dag_node_index dependent_op_index = dependent_link.first;
-    RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
-    // Signal dependent operator that current operator is done feeding input blocks.
-    if (output_rel >= 0) {
-      dependent_op->doneFeedingInputBlocks(output_rel);
-    }
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      dependent_op->informAllBlockingDependenciesMet();
-    }
-  }
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
deleted file mode 100644
index a274742..0000000
--- a/query_execution/QueryManagerBase.hpp
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryExecutionState.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "utility/DAG.hpp"
-#include "utility/Macros.hpp"
-
-namespace quickstep {
-
-class QueryHandle;
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A base class that manages the execution of a query including
- *        generation of new work orders, and keeping track of the query
- *        exection state.
- **/
-class QueryManagerBase {
- public:
-  typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
-
-  /**
-   * @brief Return codes for queryStatus() function.
-   *
-   * @note When both operator and query get executed, kQueryExecuted takes
-   *       precedence over kOperatorExecuted.
-   **/
-  enum class QueryStatusCode {
-    kOperatorExecuted = 0,  // An operator in the query finished execution.
-    kQueryExecuted,         // The query got executed.
-    kNone                   // None of the above.
-  };
-
-  /**
-   * @brief Constructor.
-   *
-   * @param query_handle The QueryHandle object for this query.
-   **/
-  explicit QueryManagerBase(QueryHandle *query_handle);
-
-  /**
-   * @brief Virtual destructor.
-   **/
-  virtual ~QueryManagerBase() {}
-
-  /**
-   * @brief Get the query handle.
-   **/
-  const QueryHandle* query_handle() const {
-    return query_handle_;
-  }
-
-  /**
-   * @brief Get the QueryExecutionState for this query.
-   **/
-  inline const QueryExecutionState& getQueryExecutionState() const {
-    return *query_exec_state_;
-  }
-
-  /**
-   * @brief Process the received WorkOrder complete message.
-   *
-   * @param op_index The index of the specified operator node in the query DAG
-   *        for the completed WorkOrder.
-   **/
-  void processWorkOrderCompleteMessage(const dag_node_index op_index);
-
-  /**
-   * @brief Process the received RebuildWorkOrder complete message.
-   *
-   * @param op_index The index of the specified operator node in the query DAG
-   *        for the completed RebuildWorkOrder.
-   **/
-  void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
-
-  /**
-   * @brief Process the received data pipeline message.
-   *
-   * @param op_index The index of the specified operator node in the query DAG
-   *        for the pipelining block.
-   * @param block The block id.
-   * @param rel_id The ID of the relation that produced 'block'.
-   **/
-  void processDataPipelineMessage(const dag_node_index op_index,
-                                  const block_id block,
-                                  const relation_id rel_id);
-
-  /**
-   * @brief Fetch all work orders currently available in relational operator and
-   *        store them internally.
-   *
-   * @param index The index of the relational operator to be processed in the
-   *        query plan DAG.
-   *
-   * @return Whether any work order was generated by op.
-   **/
-  virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0;
-
-  /**
-   * @brief Process the received work order feedback message and notify
-   *        relational operator.
-   *
-   * @param op_index The index of the specified operator node in the query DAG
-   *        for the feedback message.
-   * @param message Feedback message from work order.
-   **/
-  void processFeedbackMessage(const dag_node_index op_index,
-                              const WorkOrder::FeedbackMessage &message);
-
-  /**
-   * @brief Get the query status after processing an incoming message.
-   *
-   * @param op_index The index of the specified operator node in the query DAG
-   *        for the incoming message.
-   *
-   * @return QueryStatusCode as determined after the message is processed.
-   **/
-  QueryStatusCode queryStatus(const dag_node_index op_index);
-
- protected:
-  /**
-   * @brief Process a current relational operator: Get its workorders and store
-   *        them in the WorkOrdersContainer for this query. If the operator can
-   *        be marked as done, do so.
-   *
-   * @param index The index of the relational operator to be processed in the
-   *        query plan DAG.
-   * @param recursively_check_dependents If an operator is done, should we
-   *        call processOperator on its dependents recursively.
-   **/
-  void processOperator(const dag_node_index index,
-                       const bool recursively_check_dependents);
-
-  /**
-   * @brief This function does the following things:
-   *        1. Mark the given relational operator as "done".
-   *        2. For all the dependents of this operator, check if all of their
-   *        blocking dependencies are met. If so inform them that the blocking
-   *        dependencies are met.
-   *        3. Check if the given operator is done producing output. If it's
-   *        done, inform the dependents that they won't receive input anymore
-   *        from the given operator.
-   *
-   * @param index The index of the given relational operator in the DAG.
-   **/
-  void markOperatorFinished(const dag_node_index index);
-
-  /**
-   * @brief Check if all the dependencies of the node at specified index have
-   *        finished their execution.
-   *
-   * @note This function's true return value is a pre-requisite for calling
-   *       getRebuildWorkOrders()
-   *
-   * @param node_index The index of the specified node in the query DAG.
-   *
-   * @return True if all the dependencies have finished their execution. False
-   *         otherwise.
-   **/
-  inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
-    for (const dag_node_index dependency_index :
-         query_dag_->getDependencies(node_index)) {
-      // If at least one of the dependencies is not met, return false.
-      if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * @brief Check if all the blocking dependencies of the node at specified
-   *        index have finished their execution.
-   *
-   * @note A blocking dependency is the one which is pipeline breaker. Output of
-   *       a dependency can't be streamed to its dependent if the link between
-   *       them is pipeline breaker.
-   *
-   * @param node_index The index of the specified node in the query DAG.
-   *
-   * @return True if all the blocking dependencies have finished their
-   *         execution. False otherwise.
-   **/
-  inline bool checkAllBlockingDependenciesMet(
-      const dag_node_index node_index) const {
-    for (const dag_node_index blocking_dependency_index :
-         blocking_dependencies_[node_index]) {
-      if (!query_exec_state_->hasExecutionFinished(
-              blocking_dependency_index)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * @brief Check if the execution of the given operator is over.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the execution of the given operator is over, false
-   *         otherwise.
-   **/
-  inline bool checkOperatorExecutionOver(const dag_node_index index) const {
-    return this->checkNormalExecutionOver(index) &&
-           (!checkRebuildRequired(index) || this->checkRebuildOver(index));
-  }
-
-  /**
-   * @brief Check if the rebuild operation is required for a given operator.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild operation is required, false otherwise.
-   **/
-  inline bool checkRebuildRequired(const dag_node_index index) const {
-    return query_exec_state_->isRebuildRequired(index);
-  }
-
-  /**
-   * @brief Check if the rebuild operation for a given operator has been
-   *        initiated.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild operation has been initiated, false otherwise.
-   **/
-  inline bool checkRebuildInitiated(const dag_node_index index) const {
-    return query_exec_state_->hasRebuildInitiated(index);
-  }
-
-  const QueryHandle *query_handle_;
-
-  const std::size_t query_id_;
-
-  DAG<RelationalOperator, bool> *query_dag_;  // Owned by 'query_handle_'.
-  const dag_node_index num_operators_in_dag_;
-
-  // For all nodes, store their receiving dependents.
-  std::vector<std::vector<dag_node_index>> output_consumers_;
-
-  // For all nodes, store their pipeline breaking dependencies (if any).
-  std::vector<std::vector<dag_node_index>> blocking_dependencies_;
-
-  std::unique_ptr<QueryExecutionState> query_exec_state_;
-
- private:
-  /**
-   * @brief Check if the given operator's normal execution is over.
-   *
-   * @note The conditions for a given operator's normal execution to get over:
-   *       1. All of its  normal (i.e. non rebuild) WorkOrders have finished
-   *       execution.
-   *       2. The operator is done generating work orders.
-   *       3. All of the dependencies of the given operator have been met.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the normal execution of the given operator is over, false
-   *         otherwise.
-   **/
-  virtual bool checkNormalExecutionOver(const dag_node_index index) const = 0;
-
-  /**
-   * @brief Initiate the rebuild process for partially filled blocks generated
-   *        during the execution of the given operator.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild is over immediately, i.e. the operator didn't
-   *         generate any rebuild WorkOrders, false otherwise.
-   **/
-  virtual bool initiateRebuild(const dag_node_index index) = 0;
-
-  /**
-   * @brief Check if the rebuild operation for a given operator is over.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild operation is over, false otherwise.
-   **/
-  virtual bool checkRebuildOver(const dag_node_index index) const = 0;
-
-  DISALLOW_COPY_AND_ASSIGN(QueryManagerBase);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
deleted file mode 100644
index 7d45933..0000000
--- a/query_execution/QueryManagerDistributed.cpp
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/QueryManagerDistributed.hpp"
-
-#include <cstddef>
-#include <cstdlib>
-#include <memory>
-#include <utility>
-
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/ShiftbossDirectory.hpp"
-#include "query_execution/WorkOrderProtosContainer.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.pb.h"
-#include "utility/DAG.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-using std::free;
-using std::malloc;
-using std::move;
-using std::size_t;
-using std::unique_ptr;
-
-namespace quickstep {
-
-QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
-                                                 const ShiftbossDirectory *shiftboss_directory,
-                                                 const tmb::client_id foreman_client_id,
-                                                 tmb::MessageBus *bus)
-    : QueryManagerBase(query_handle),
-      shiftboss_directory_(shiftboss_directory),
-      foreman_client_id_(foreman_client_id),
-      bus_(bus),
-      normal_workorder_protos_container_(
-          new WorkOrderProtosContainer(num_operators_in_dag_)) {
-  // Collect all the workorders from all the relational operators in the DAG.
-  for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
-    if (checkAllBlockingDependenciesMet(index)) {
-      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
-      processOperator(index, false);
-    }
-  }
-}
-
-serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage(
-    const dag_node_index start_operator_index) {
-  // Default policy: Operator with lowest index first.
-  size_t num_operators_checked = 0;
-  for (dag_node_index index = start_operator_index;
-       num_operators_checked < num_operators_in_dag_;
-       index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
-    if (query_exec_state_->hasExecutionFinished(index)) {
-      continue;
-    }
-    unique_ptr<serialization::WorkOrder> work_order_proto(
-        normal_workorder_protos_container_->getWorkOrderProto(index));
-    if (work_order_proto != nullptr) {
-      query_exec_state_->incrementNumQueuedWorkOrders(index);
-
-      unique_ptr<serialization::WorkOrderMessage> message_proto(new serialization::WorkOrderMessage);
-      message_proto->set_query_id(query_id_);
-      message_proto->set_operator_index(index);
-      message_proto->mutable_work_order()->MergeFrom(*work_order_proto);
-
-      return message_proto.release();
-    }
-  }
-  // No normal WorkOrder protos available right now.
-  return nullptr;
-}
-
-bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) {
-  bool generated_new_workorder_protos = false;
-  if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
-    // Do not fetch any work units until all blocking dependencies are met.
-    // The releational operator is not aware of blocking dependencies for
-    // uncorrelated scalar queries.
-    if (!checkAllBlockingDependenciesMet(index)) {
-      return false;
-    }
-    const size_t num_pending_workorder_protos_before =
-        normal_workorder_protos_container_->getNumWorkOrderProtos(index);
-    const bool done_generation =
-        query_dag_->getNodePayloadMutable(index)
-            ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
-    if (done_generation) {
-      query_exec_state_->setDoneGenerationWorkOrders(index);
-    }
-
-    // TODO(shoban): It would be a good check to see if operator is making
-    // useful progress, i.e., the operator either generates work orders to
-    // execute or still has pending work orders executing. However, this will not
-    // work if Foreman polls operators without feeding data. This check can be
-    // enabled, if Foreman is refactored to call getAllWorkOrders() only when
-    // pending work orders are completed or new input blocks feed.
-
-    generated_new_workorder_protos =
-        (num_pending_workorder_protos_before <
-         normal_workorder_protos_container_->getNumWorkOrderProtos(index));
-  }
-  return generated_new_workorder_protos;
-}
-
-void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
-                                                                    const std::size_t num_rebuild_work_orders) {
-  // TODO(zuyu): Multiple Shiftbosses support.
-  query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
-
-  if (num_rebuild_work_orders != 0u) {
-    // Wait for the rebuild work orders to finish.
-    return;
-  }
-
-  // No needs for rebuilds.
-  markOperatorFinished(op_index);
-
-  for (const std::pair<dag_node_index, bool> &dependent_link :
-       query_dag_->getDependents(op_index)) {
-    const dag_node_index dependent_op_index = dependent_link.first;
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      processOperator(dependent_op_index, true);
-    }
-  }
-}
-
-bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
-  DCHECK(checkRebuildRequired(index));
-  DCHECK(!checkRebuildInitiated(index));
-
-  const RelationalOperator &op = query_dag_->getNodePayload(index);
-  DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
-
-  serialization::InitiateRebuildMessage proto;
-  proto.set_query_id(query_id_);
-  proto.set_operator_index(index);
-  proto.set_insert_destination_index(op.getInsertDestinationID());
-  proto.set_relation_id(op.getOutputRelationID());
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage tagged_msg(static_cast<const void *>(proto_bytes),
-                           proto_length,
-                           kInitiateRebuildMessage);
-  free(proto_bytes);
-
-  LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
-            << "') to Shiftboss";
-  // TODO(zuyu): Multiple workers support.
-  QueryExecutionUtil::SendTMBMessage(bus_,
-                                     foreman_client_id_,
-                                     shiftboss_directory_->getClientId(0),
-                                     move(tagged_msg));
-
-  // The negative value indicates that the number of rebuild work orders is to be
-  // determined.
-  query_exec_state_->setRebuildStatus(index, -1, true);
-
-  // Wait for Shiftbosses to report the number of rebuild work orders.
-  return false;
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
deleted file mode 100644
index e609ab8..0000000
--- a/query_execution/QueryManagerDistributed.hpp
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
-
-#include <cstddef>
-#include <memory>
-
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/WorkOrderProtosContainer.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class QueryHandle;
-class ShiftbossDirectory;
-
-namespace serialization { class WorkOrderMessage; }
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class that manages the execution of a query including generation
- *        of new work orders, keeping track of the query exection state.
- **/
-class QueryManagerDistributed final : public QueryManagerBase {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param query_handle The QueryHandle object for this query.
-   * @param shiftboss_directory The ShiftbossDirectory to use.
-   * @param foreman_client_id The TMB client ID of the foreman thread.
-   * @param bus The TMB used for communication.
-   **/
-  QueryManagerDistributed(QueryHandle *query_handle,
-                          const ShiftbossDirectory *shiftboss_directory,
-                          const tmb::client_id foreman_client_id,
-                          tmb::MessageBus *bus);
-
-  ~QueryManagerDistributed() override {}
-
-  bool fetchNormalWorkOrders(const dag_node_index index) override;
-
-  /**
-   * @brief Process the initiate rebuild work order response message.
-   *
-   * @param op_index The index of the specified operator node in the query DAG
-   *        for initiating the rebuild work order.
-   * @param num_rebuild_work_orders The number of the rebuild work orders
-   *        generated for the operator indexed by 'op_index'.
-   **/
-  void processInitiateRebuildResponseMessage(const dag_node_index op_index,
-                                             const std::size_t num_rebuild_work_orders);
-
-  /**
-   * @brief Get the next normal workorder to be excuted, wrapped in a
-   *        WorkOrderMessage proto.
-   *
-   * @param start_operator_index Begin the search for the schedulable WorkOrder
-   *        with the operator at this index.
-   *
-   * @return A pointer to the WorkOrderMessage proto. If there is no WorkOrder
-   *         to be executed, return NULL.
-   **/
-  serialization::WorkOrderMessage* getNextWorkOrderMessage(
-      const dag_node_index start_operator_index);
-
- private:
-  bool checkNormalExecutionOver(const dag_node_index index) const override {
-    return (checkAllDependenciesMet(index) &&
-            !normal_workorder_protos_container_->hasWorkOrderProto(index) &&
-            query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
-            query_exec_state_->hasDoneGenerationWorkOrders(index));
-  }
-
-  bool initiateRebuild(const dag_node_index index) override;
-
-  bool checkRebuildOver(const dag_node_index index) const override {
-    return query_exec_state_->hasRebuildInitiated(index) &&
-           (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
-  }
-
-  const ShiftbossDirectory *shiftboss_directory_;
-
-  const tmb::client_id foreman_client_id_;
-  tmb::MessageBus *bus_;
-
-  std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
-
-  DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
deleted file mode 100644
index 237796f..0000000
--- a/query_execution/QueryManagerSingleNode.cpp
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/QueryManagerSingleNode.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "relational_operators/RebuildWorkOrder.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/StorageBlock.hpp"
-#include "utility/DAG.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace quickstep {
-
-class WorkOrder;
-
-QueryManagerSingleNode::QueryManagerSingleNode(
-    const tmb::client_id foreman_client_id,
-    const std::size_t num_numa_nodes,
-    QueryHandle *query_handle,
-    CatalogDatabaseLite *catalog_database,
-    StorageManager *storage_manager,
-    tmb::MessageBus *bus)
-    : QueryManagerBase(query_handle),
-      foreman_client_id_(foreman_client_id),
-      storage_manager_(DCHECK_NOTNULL(storage_manager)),
-      bus_(DCHECK_NOTNULL(bus)),
-      query_context_(new QueryContext(query_handle->getQueryContextProto(),
-                                      *catalog_database,
-                                      storage_manager_,
-                                      foreman_client_id_,
-                                      bus_)),
-      workorders_container_(
-          new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) {
-  // Collect all the workorders from all the relational operators in the DAG.
-  for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
-    if (checkAllBlockingDependenciesMet(index)) {
-      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
-      processOperator(index, false);
-    }
-  }
-}
-
-WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
-    const dag_node_index start_operator_index, const numa_node_id numa_node) {
-  // Default policy: Operator with lowest index first.
-  WorkOrder *work_order = nullptr;
-  size_t num_operators_checked = 0;
-  for (dag_node_index index = start_operator_index;
-       num_operators_checked < num_operators_in_dag_;
-       index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
-    if (query_exec_state_->hasExecutionFinished(index)) {
-      continue;
-    }
-    if (numa_node != kAnyNUMANodeID) {
-      // First try to get a normal WorkOrder from the specified NUMA node.
-      work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
-      if (work_order != nullptr) {
-        // A WorkOrder found on the given NUMA node.
-        query_exec_state_->incrementNumQueuedWorkOrders(index);
-        return WorkerMessage::WorkOrderMessage(work_order, index);
-      } else {
-        // Normal workorder not found on this node. Look for a rebuild workorder
-        // on this NUMA node.
-        work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
-        if (work_order != nullptr) {
-          return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
-        }
-      }
-    }
-    // Either no workorder found on the given NUMA node, or numa_node is
-    // 'kAnyNUMANodeID'.
-    // Try to get a normal WorkOrder from other NUMA nodes.
-    work_order = workorders_container_->getNormalWorkOrder(index);
-    if (work_order != nullptr) {
-      query_exec_state_->incrementNumQueuedWorkOrders(index);
-      return WorkerMessage::WorkOrderMessage(work_order, index);
-    } else {
-      // Normal WorkOrder not found, look for a RebuildWorkOrder.
-      work_order = workorders_container_->getRebuildWorkOrder(index);
-      if (work_order != nullptr) {
-        return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
-      }
-    }
-  }
-  // No WorkOrders available right now.
-  return nullptr;
-}
-
-bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) {
-  bool generated_new_workorders = false;
-  if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
-    // Do not fetch any work units until all blocking dependencies are met.
-    // The releational operator is not aware of blocking dependencies for
-    // uncorrelated scalar queries.
-    if (!checkAllBlockingDependenciesMet(index)) {
-      return false;
-    }
-    const size_t num_pending_workorders_before =
-        workorders_container_->getNumNormalWorkOrders(index);
-    const bool done_generation =
-        query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
-                                                                   query_context_.get(),
-                                                                   storage_manager_,
-                                                                   foreman_client_id_,
-                                                                   bus_);
-    if (done_generation) {
-      query_exec_state_->setDoneGenerationWorkOrders(index);
-    }
-
-    // TODO(shoban): It would be a good check to see if operator is making
-    // useful progress, i.e., the operator either generates work orders to
-    // execute or still has pending work orders executing. However, this will not
-    // work if Foreman polls operators without feeding data. This check can be
-    // enabled, if Foreman is refactored to call getAllWorkOrders() only when
-    // pending work orders are completed or new input blocks feed.
-
-    generated_new_workorders =
-        (num_pending_workorders_before <
-         workorders_container_->getNumNormalWorkOrders(index));
-  }
-  return generated_new_workorders;
-}
-
-bool QueryManagerSingleNode::initiateRebuild(const dag_node_index index) {
-  DCHECK(!workorders_container_->hasRebuildWorkOrder(index));
-  DCHECK(checkRebuildRequired(index));
-  DCHECK(!checkRebuildInitiated(index));
-
-  getRebuildWorkOrders(index, workorders_container_.get());
-
-  query_exec_state_->setRebuildStatus(
-      index, workorders_container_->getNumRebuildWorkOrders(index), true);
-
-  return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
-}
-
-void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
-                                                  WorkOrdersContainer *container) {
-  const RelationalOperator &op = query_dag_->getNodePayload(index);
-  const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
-
-  if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
-    return;
-  }
-
-  std::vector<MutableBlockReference> partially_filled_block_refs;
-
-  DCHECK(query_context_ != nullptr);
-  InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
-  DCHECK(insert_destination != nullptr);
-
-  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
-
-  for (std::vector<MutableBlockReference>::size_type i = 0;
-       i < partially_filled_block_refs.size();
-       ++i) {
-    container->addRebuildWorkOrder(
-        new RebuildWorkOrder(query_id_,
-                             std::move(partially_filled_block_refs[i]),
-                             index,
-                             op.getOutputRelationID(),
-                             foreman_client_id_,
-                             bus_),
-        index);
-  }
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
deleted file mode 100644
index dd044a5..0000000
--- a/query_execution/QueryManagerSingleNode.hpp
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
-
-#include <cstddef>
-#include <memory>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class StorageManager;
-class WorkerMessage;
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class that manages the execution of a query including generation
- *        of new work orders, keeping track of the query exection state.
- **/
-class QueryManagerSingleNode final : public QueryManagerBase {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param foreman_client_id The TMB client ID of the foreman thread.
-   * @param num_numa_nodes The number of NUMA nodes used by the system.
-   * @param query_handle The QueryHandle object for this query.
-   * @param catalog_database The CatalogDatabse used by the query.
-   * @param storage_manager The StorageManager used by the query.
-   * @param bus The TMB used for communication.
-   **/
-  QueryManagerSingleNode(const tmb::client_id foreman_client_id,
-                         const std::size_t num_numa_nodes,
-                         QueryHandle *query_handle,
-                         CatalogDatabaseLite *catalog_database,
-                         StorageManager *storage_manager,
-                         tmb::MessageBus *bus);
-
-  ~QueryManagerSingleNode() override {}
-
-  bool fetchNormalWorkOrders(const dag_node_index index) override;
-
- /**
-   * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
-   *
-   * @param start_operator_index Begin the search for the schedulable WorkOrder
-   *        with the operator at this index.
-   * @param numa_node The next WorkOrder should preferably have its input(s)
-   *        from this numa_node. This is a hint and not a binding requirement.
-   *
-   * @return A pointer to the WorkerMessage. If there's no WorkOrder to be
-   *         executed, return NULL.
-   **/
-  WorkerMessage* getNextWorkerMessage(
-      const dag_node_index start_operator_index,
-      const numa_node_id node_id = kAnyNUMANodeID);
-
-  /**
-   * @brief Get a pointer to the QueryContext.
-   **/
-  inline QueryContext* getQueryContextMutable() {
-    return query_context_.get();
-  }
-
- private:
-  bool checkNormalExecutionOver(const dag_node_index index) const override {
-    return (checkAllDependenciesMet(index) &&
-            !workorders_container_->hasNormalWorkOrder(index) &&
-            query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
-            query_exec_state_->hasDoneGenerationWorkOrders(index));
-  }
-
-  bool initiateRebuild(const dag_node_index index) override;
-
-  bool checkRebuildOver(const dag_node_index index) const override {
-    return query_exec_state_->hasRebuildInitiated(index) &&
-           !workorders_container_->hasRebuildWorkOrder(index) &&
-           (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
-  }
-
-  /**
-   * @brief Get the rebuild WorkOrders for an operator.
-   *
-   * @note This function should be called only once, when all the normal
-   *       WorkOrders generated by an operator finish their execution.
-   *
-   * @param index The index of the operator in the query plan DAG.
-   * @param container A pointer to a WorkOrdersContainer to be used to store the
-   *        generated WorkOrders.
-   **/
-  void getRebuildWorkOrders(const dag_node_index index,
-                            WorkOrdersContainer *container);
-
-  const tmb::client_id foreman_client_id_;
-
-  StorageManager *storage_manager_;
-  tmb::MessageBus *bus_;
-
-  std::unique_ptr<QueryContext> query_context_;
-
-  std::unique_ptr<WorkOrdersContainer> workorders_container_;
-
-  DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/README.md
----------------------------------------------------------------------
diff --git a/query_execution/README.md b/query_execution/README.md
deleted file mode 100644
index 12e0f57..0000000
--- a/query_execution/README.md
+++ /dev/null
@@ -1,149 +0,0 @@
-# An Overview of Quickstep's Execution Engine
-
-## Types of threads
-There are two kinds of threads in Quickstep - Foreman and Worker. The foreman
-thread controls the query execution progress, finds schedulable work (called as
-WorkOrder) and assigns (or schedules) it for execution to the Worker threads.
-The Worker threads receive the WorkOrders and execute them. After execution they
-send a completion message (or response message) back to Foreman.
-
-## High level functionality of Foreman
-Foreman requests all the RelationalOperators in the physical query plan
-represented as a DAG to give any schedulable work (in the form of WorkOrders).
-While doing so, Foreman has to respect dependencies between operators. There are
-two kinds of dependencies between operators - pipeline breaking (or blocking)
-and pipeline non-breaking (or non-blocking). In the first case, the output of
-the producer operator can't be pipelined to the consumer operator. In the second
-case, the Foreman will facilitate the pipelining of the intermediate output
-produced by the producer operator to the consumer operator.
-
-## Messages in execution engine
-
-### WorkerMessage
-There are multiple types of WorkerMessage, each of which indicates the purpose
-of the message.
-
-Foreman -> Worker : WorkerMessage which consists of the following things
-- A pointer to the WorkOrder to be executed. The WorkOrder could be a normal
-WorkOrder or a rebuild WorkOrder. A normal WorkOrder involves the invocation of
-WorkOrder::execute() method which is overriden by all of the RelationalOperator
-classes. A rebuild WorkOrder has one StorageBlock as input and calls a
-rebuild() method on the block. More details about rebuild() can be found in the
-storage module.
-- The index of the relational operator in the query plan DAG that produced the
-WorkOrder.
-
-### ForemanMessage
-Multiple senders are possible for this message. There are multiple types of
-ForemanMessages, each of which indicates the purpose of the message.
-
-Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and
-RebuildCompletion are sent after a Worker finishes executing a respective type
-of WorkOrder. This message helps the Foreman track the progress of individual
-operators as well as the whole query.
-
-Some relational operators and InsertDestination -> Foreman : ForemanMessage of
-types DataPipeline and WorkOrdersAvailable. InsertDestination first determines
-when an output block of a relational operator gets full. Once a block is full,
-it streams the unique block ID of the filled block along with the index of the
-relational operator that produced the block to Foreman with the message type
-DataPipeline. Some operators which modify the block in place also send similar
-messages to Foreman.
-
-### FeedbackMessage
-This message is sent from Workers to the Foreman during a WorkOrder execution.
-
-In certain operators, e.g. TextScan (used for bulk loading data from text files)
-and Sort, there is a communication between the relational operator and its
-WorkOrders. In such cases, when a WorkOrder is under execution on a Worker
-thread, a FeedbackMessage is sent from the WorkOrder via the Worker to Foreman.
-Foreman relays this message to the relational operator that produced the sender
-WorkOrder. The relational operator uses this message to update its internal
-state to potentially generate newer WorkOrders.
-
-### PoisonMessage
-This message is used to terminate a thread (i.e., Foreman and Worker), typically
-when shutting down the Quickstep process.
-
-## How does the Foreman react after receiving various messages?
-### WorkOrder completion message
-* Update the book-keeping of pending WorkOrders per Worker and per operator.
-* Fetch new WorkOrders if available for the operator of whose WorkOrder was
-just executed.
-* Update the state of an operator - the possible options are:
-  - Normal WorkOrders are still under execution
-  - All normal WorkOrders have finished execution and rebuild WorkOrders are yet
-  to be generated.
-  - All normal WorkOrders have finished execution, rebuild WorkOrders have been
-  generated and issued to Workers.
-  - All normal and rebuild WorkOrders have been executed AND all the dependency
-  operators for the given operator have finished execution, therefore the given
-  operator has finished its execution.
-* Fetch the WorkOrders from the dependents of the given operator.
-
-### Rebuild WorkOrder completion message
-* Update the book-keeping of pending WorkOrders per Worker and per operator.
-* If all the rebuild WorkOrders have finished their execution, try to fetch the
-WorkOrders of the dependent operators of the operator whose rebuild WorkOrder
-was just executed.
-
-### Data pipeline message
-* Find the consumer operators (i.e. operators which have a non
-pipeline-breaking link) of the producer operator.
-* Stream the block ID to the eligible consumer operators.
-* Fetch new WorkOrders from these consumer operators which may have become
-available because of the streaming of data.
-
-### WorkOrder available message
-* Fetch new WorkOrders that may have become available.
-
-### Feedback message
-* Relay the feedback message to a specified relational operator. The recipient
-operator is specified in the header of the message.
-
-## Example
-We look at a sample query to better describe the flow of messages -
-
-SELECT R.a, S.b from R, S where R.a = S.a and R.c < 20;
-
-This is an equi-join query which can be implemented using a hash join. We assume
-that S is a larger relation and the build relation is the output of the
-selection on R.
-
-The query execution plan involves the following operators:
-* SelectOperator to filter R based on predicate R.c < 20 (We call the output as
-R')
-* BuildHashOperator to construct a hash table on R'
-* HashJoinOperator to probe the hash table, where the probe relation is S
-* DestroyHashTableOperator to destroy the hash table after the join is done
-* Multiple DropTableOperators to destroy the temporaray relations produced as
-output.
-
-R has two blocks with IDs as 1 and 2. S has two blocks with IDs as 3 and 4.
-We assume that the SelectOperator produces one filled block and one partially
-filled block as output. Note that in the query plan DAG, the link between
-SelectOperator and BuildHashOperator allows streaming of data. The
-HashJoinOperator's WorkOrder can't be generated unless all of the
-BuildHashOperator's WorkOrders have finished their execution. The execution is
-assumed to be performed by a single Worker thread.
-
-The following table describes the message exchange that happens during the
-query excution. We primarily focus on three operators - Select, BuildHash and
-HashJoin (probe).
-
-| Sender | Receiver | Message  | Message Description |
-|:-----------------:|----------|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | SelectWorkOrder on block 1. |
-| InsertDestination | Foreman | ForemanMessage of type kDataPipeline | SelectWorkOrder on block 1 produced one fully filled block as output.  The output block ID as pipelined from the InsertDestination to Foreman.  Foreman relays this block ID to BuildHashOperator, which generates a WorkOrder which is ready to be scheduled. |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | SelectWorkOrder on block 1 completed. |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | SelectWorkOrder on block 2. |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | SelectWorkOrder on block 2 completed.  As a result of this execution, a partially filled block of output was produced.  |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | BuildHashWorkOrder on the fully filled block of R'  |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | BuildHashWorkOrder execution complete.  |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | BuildHashWorkOrder on the partially filled block of R' |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | BuildHashWorkOrder execution complete. |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | HashJoinWorkOrder for block 3 from S |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | HashJoinWorkOrder execution complete. |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | HashJoinWorkOrder for block 4 from S |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | HashJoinWorkOrder execution complete. |
-

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
deleted file mode 100644
index ed4bade..0000000
--- a/query_execution/Shiftboss.cpp
+++ /dev/null
@@ -1,400 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/Shiftboss.hpp"
-
-#include <cstddef>
-#include <cstdlib>
-#include <memory>
-#include <string>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "relational_operators/RebuildWorkOrder.hpp"
-#include "relational_operators/WorkOrderFactory.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageManager.hpp"
-#include "threading/ThreadUtil.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/message_style.h"
-#include "tmb/tagged_message.h"
-
-using std::free;
-using std::malloc;
-using std::move;
-using std::size_t;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::AnnotatedMessage;
-using tmb::MessageBus;
-using tmb::TaggedMessage;
-
-namespace quickstep {
-
-class WorkOrder;
-
-void Shiftboss::run() {
-  if (cpu_id_ >= 0) {
-    // We can pin the shiftboss thread to a CPU if specified.
-    ThreadUtil::BindToCPU(cpu_id_);
-  }
-
-  processShiftbossRegistrationResponseMessage();
-
-  for (;;) {
-    // Receive() is a blocking call, causing this thread to sleep until next
-    // message is received.
-    AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
-    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-               << "') received the typed '" << annotated_message.tagged_message.message_type()
-               << "' message from client " << annotated_message.sender;
-    switch (annotated_message.tagged_message.message_type()) {
-      case kQueryInitiateMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::QueryInitiateMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
-        break;
-      }
-      case kWorkOrderMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::WorkOrderMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const std::size_t query_id = proto.query_id();
-        DCHECK_EQ(1u, query_contexts_.count(query_id));
-
-        WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
-                                                                       shiftboss_index_,
-                                                                       &database_cache_,
-                                                                       query_contexts_[query_id].get(),
-                                                                       storage_manager_,
-                                                                       shiftboss_client_id_,
-                                                                       bus_);
-
-        unique_ptr<WorkerMessage> worker_message(
-            WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
-
-        TaggedMessage worker_tagged_message(worker_message.get(),
-                                            sizeof(*worker_message),
-                                            kWorkOrderMessage);
-
-        const size_t worker_index = getSchedulableWorker();
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
-                   << "') from Foreman to worker " << worker_index;
-
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               workers_->getClientID(worker_index),
-                                               move(worker_tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kInitiateRebuildMessage: {
-        // Construct rebuild work orders, and send back their number to
-        // 'ForemanDistributed'.
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::InitiateRebuildMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processInitiateRebuildMessage(proto.query_id(),
-                                      proto.operator_index(),
-                                      proto.insert_destination_index(),
-                                      proto.relation_id());
-        break;
-      }
-      case kCatalogRelationNewBlockMessage:  // Fall through.
-      case kDataPipelineMessage:
-      case kWorkOrderFeedbackMessage:
-      case kWorkOrderCompleteMessage:
-      case kRebuildWorkOrderCompleteMessage: {
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded typed '" << annotated_message.tagged_message.message_type()
-                   << "' message from Worker with TMB client ID '" << annotated_message.sender
-                   << "' to Foreman with TMB client ID " << foreman_client_id_;
-
-        DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               foreman_client_id_,
-                                               move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kQueryTeardownMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::QueryTeardownMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        query_contexts_.erase(proto.query_id());
-        break;
-      }
-      case kSaveQueryResultMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::SaveQueryResultMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        for (int i = 0; i < proto.blocks_size(); ++i) {
-          storage_manager_->saveBlockOrBlob(proto.blocks(i));
-        }
-
-        // Clean up query execution states, i.e., QueryContext.
-        query_contexts_.erase(proto.query_id());
-
-        serialization::SaveQueryResultResponseMessage proto_response;
-        proto_response.set_query_id(proto.query_id());
-        proto_response.set_relation_id(proto.relation_id());
-        proto_response.set_cli_id(proto.cli_id());
-        proto_response.set_shiftboss_index(shiftboss_index_);
-
-        const size_t proto_response_length = proto_response.ByteSize();
-        char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
-        CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
-
-        TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
-                                       proto_response_length,
-                                       kSaveQueryResultResponseMessage);
-        free(proto_response_bytes);
-
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
-                   << "') to Foreman with TMB client ID " << foreman_client_id_;
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               foreman_client_id_,
-                                               move(message_response));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kPoisonMessage: {
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded PoisonMessage (typed '" << kPoisonMessage
-                   << "') from Foreman to all workers";
-
-        tmb::MessageStyle broadcast_style;
-        broadcast_style.Broadcast(true);
-
-        const MessageBus::SendStatus send_status =
-            bus_->Send(shiftboss_client_id_,
-                       worker_addresses_,
-                       broadcast_style,
-                       move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        return;
-      }
-      default: {
-        LOG(FATAL) << "Unknown TMB message type";
-      }
-    }
-  }
-}
-
-size_t Shiftboss::getSchedulableWorker() {
-  const size_t num_workers = workers_->getNumWorkers();
-
-  size_t curr_worker = start_worker_index_;
-  for (;;) {
-    if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) {
-      start_worker_index_ = (curr_worker + 1) % num_workers;
-      // TODO(zuyu): workers_->incrementNumQueuedWorkOrders(curr_worker);
-      // But we need a WorkOrder queue first.
-      return curr_worker;
-    }
-
-    curr_worker = (curr_worker + 1) % num_workers;
-  }
-}
-
-void Shiftboss::registerWithForeman() {
-  tmb::Address all_addresses;
-  all_addresses.All(true);
-
-  tmb::MessageStyle style;
-
-  serialization::ShiftbossRegistrationMessage proto;
-  proto.set_work_order_capacity(getWorkOrderCapacity());
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kShiftbossRegistrationMessage);
-  free(proto_bytes);
-
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-             << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
-             << "') to all";
-  tmb::MessageBus::SendStatus send_status =
-      bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
-  DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-}
-
-void Shiftboss::processShiftbossRegistrationResponseMessage() {
-  AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
-  const TaggedMessage &tagged_message = annotated_message.tagged_message;
-  DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
-
-  foreman_client_id_ = annotated_message.sender;
-  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-             << "') received the typed '" << kShiftbossRegistrationResponseMessage
-             << "' message from ForemanDistributed with client " << foreman_client_id_;
-
-  serialization::ShiftbossRegistrationResponseMessage proto;
-  CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-  shiftboss_index_ = proto.shiftboss_index();
-
-  // Forward this message to Workers regarding <shiftboss_index_>.
-  QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
-                                       worker_addresses_,
-                                       move(annotated_message.tagged_message),
-                                       bus_);
-}
-
-void Shiftboss::processQueryInitiateMessage(
-    const std::size_t query_id,
-    const serialization::CatalogDatabase &catalog_database_cache_proto,
-    const serialization::QueryContext &query_context_proto) {
-  database_cache_.update(catalog_database_cache_proto);
-
-  auto query_context = std::make_unique<QueryContext>(
-      query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_);
-  query_contexts_.emplace(query_id, move(query_context));
-
-  serialization::QueryInitiateResponseMessage proto;
-  proto.set_query_id(query_id);
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message_response(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kQueryInitiateResponseMessage);
-  free(proto_bytes);
-
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-             << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
-             << "') to Foreman with TMB client ID " << foreman_client_id_;
-  const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         shiftboss_client_id_,
-                                         foreman_client_id_,
-                                         move(message_response));
-  CHECK(send_status == MessageBus::SendStatus::kOK);
-}
-
-void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
-                                              const std::size_t op_index,
-                                              const QueryContext::insert_destination_id dest_index,
-                                              const relation_id rel_id) {
-  DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
-
-  DCHECK_EQ(1u, query_contexts_.count(query_id));
-  InsertDestination *insert_destination = query_contexts_[query_id]->getInsertDestination(dest_index);
-  DCHECK(insert_destination != nullptr);
-
-  vector<MutableBlockReference> partially_filled_block_refs;
-  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
-
-  serialization::InitiateRebuildResponseMessage proto;
-  proto.set_query_id(query_id);
-  proto.set_operator_index(op_index);
-  proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
-  proto.set_shiftboss_index(shiftboss_index_);
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message_response(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kInitiateRebuildResponseMessage);
-  free(proto_bytes);
-
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-             << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
-             << "') to Foreman with TMB client ID " << foreman_client_id_;
-  const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         shiftboss_client_id_,
-                                         foreman_client_id_,
-                                         move(message_response));
-  CHECK(send_status == MessageBus::SendStatus::kOK);
-
-  for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
-    // NOTE(zuyu): Worker releases the memory after the execution of
-    // RebuildWorkOrder on the Worker.
-    WorkOrder *rebuild_work_order =
-        new RebuildWorkOrder(query_id,
-                             move(partially_filled_block_refs[i]),
-                             op_index,
-                             rel_id,
-                             shiftboss_client_id_,
-                             bus_);
-
-    unique_ptr<WorkerMessage> worker_message(
-        WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index));
-
-    TaggedMessage worker_tagged_message(worker_message.get(),
-                                        sizeof(*worker_message),
-                                        kRebuildWorkOrderMessage);
-
-    const size_t worker_index = getSchedulableWorker();
-    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-               << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
-               << "') to worker " << worker_index;
-
-    const MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
-                                           workers_->getClientID(worker_index),
-                                           move(worker_tagged_message));
-    CHECK(send_status == MessageBus::SendStatus::kOK);
-  }
-}
-
-}  // namespace quickstep