You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sp...@apache.org on 2016/12/11 17:45:40 UTC

[17/51] [abbrv] [partial] incubator-quickstep git commit: remove c++ files

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
deleted file mode 100644
index 6538d48..0000000
--- a/query_execution/Shiftboss.hpp
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
-
-#include <cstddef>
-#include <cstdint>
-#include <memory>
-#include <unordered_map>
-
-#include "catalog/CatalogDatabaseCache.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "threading/Thread.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-class StorageManager;
-
-namespace serialization {
-class CatalogDatabase;
-class QueryContext;
-}  // namespace serialization
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief The Shiftboss accepts workorder protos from shiftboss, and assigns
- *        the workorders to workers.
- **/
-class Shiftboss : public Thread {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param bus A pointer to the TMB.
-   * @param storage_manager The StorageManager to use.
-   * @param workers A pointer to the WorkerDirectory.
-   * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
-   *
-   * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
-   *       around on different CPUs by the OS.
-  **/
-  Shiftboss(tmb::MessageBus *bus,
-            StorageManager *storage_manager,
-            WorkerDirectory *workers,
-            const int cpu_id = -1)
-      : bus_(DCHECK_NOTNULL(bus)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)),
-        workers_(DCHECK_NOTNULL(workers)),
-        cpu_id_(cpu_id),
-        shiftboss_client_id_(tmb::kClientIdNone),
-        foreman_client_id_(tmb::kClientIdNone),
-        max_msgs_per_worker_(1),
-        start_worker_index_(0u) {
-    // Check to have at least one Worker.
-    DCHECK_GT(workers->getNumWorkers(), 0u);
-
-    shiftboss_client_id_ = bus_->Connect();
-    LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
-    DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
-
-    // Messages between Foreman and Shiftboss.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage);
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
-
-    // Message sent to Worker.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
-
-    // Forward the following message types from Foreman to Workers.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
-
-    // Forward the following message types from Workers to Foreman.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
-
-    // Clean up query execution states, i.e., QueryContext.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
-
-    // Stop itself.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
-    // Stop all workers.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage);
-
-    for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
-      worker_addresses_.AddRecipient(workers_->getClientID(i));
-    }
-
-    registerWithForeman();
-  }
-
-  ~Shiftboss() override {
-  }
-
-  /**
-   * @brief Get the TMB client ID of Shiftboss thread.
-   *
-   * @return TMB client ID of shiftboss thread.
-   **/
-  inline tmb::client_id getBusClientID() const {
-    return shiftboss_client_id_;
-  }
-
-  /**
-   * @brief Get the Work Order processing capacity of all Workers managed by
-   *        Shiftboss during a single round of WorkOrder dispatch.
-   **/
-  inline std::size_t getWorkOrderCapacity() const {
-    DCHECK_NE(max_msgs_per_worker_, 0u);
-    return max_msgs_per_worker_ * workers_->getNumWorkers();
-  }
-
-  /**
-   * @brief Get the Worker to assign WorkOrders for execution. Block to wait if
-   *        all Workers have reached their capacity for queued WorkOrders.
-   **/
-  // TODO(zuyu): To achieve non-blocking, we need a queue to cache received
-  // normal Work Order protos from Foreman and the generated rebuild Work Orders.
-  inline std::size_t getSchedulableWorker();
-
-  /**
-   * @brief Set the maximum number of messages that should be allocated to each
-   *        worker during a single round of WorkOrder dispatch.
-   *
-   * @param max_msgs_per_worker Maximum number of messages.
-   **/
-  inline void setMaxMessagesPerWorker(const std::size_t max_msgs_per_worker) {
-    max_msgs_per_worker_ = max_msgs_per_worker;
-  }
-
- protected:
-  /**
-   * @brief The shiftboss receives workorders, and based on the response it
-   *        assigns workorders to workers.
-   *
-   * @note  The workers who get the messages from the Shiftboss execute and
-   *        subsequently delete the WorkOrder contained in the message.
-   **/
-  void run() override;
-
- private:
-  void registerWithForeman();
-
-  void processShiftbossRegistrationResponseMessage();
-
-  /**
-   * @brief Process the Shiftboss initiate message and ack back.
-   *
-   * @param query_id The given query id.
-   * @param catalog_database_cache_proto The proto used to update
-   *        CatalogDatabaseCache.
-   * @param query_context_proto The QueryContext proto.
-   **/
-  void processQueryInitiateMessage(const std::size_t query_id,
-                                   const serialization::CatalogDatabase &catalog_database_cache_proto,
-                                   const serialization::QueryContext &query_context_proto);
-
-  /**
-   * @brief Process the RebuildWorkOrder initiate message and ack back.
-   *
-   * @param query_id The ID of the query to which this RebuildWorkOrder initiate
-   *        message belongs.
-   * @param op_index The index of the operator for rebuild work orders.
-   * @param dest_index The InsertDestination index in QueryContext to rebuild.
-   * @param rel_id The relation that needs to generate rebuild work orders.
-   **/
-  void processInitiateRebuildMessage(const std::size_t query_id,
-                                     const std::size_t op_index,
-                                     const QueryContext::insert_destination_id dest_index,
-                                     const relation_id rel_id);
-
-  // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss,
-  // and Shiftboss and Worker thread pool.
-  tmb::MessageBus *bus_;
-
-  CatalogDatabaseCache database_cache_;
-  StorageManager *storage_manager_;
-  WorkerDirectory *workers_;
-
-  // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
-  const int cpu_id_;
-
-  tmb::client_id shiftboss_client_id_, foreman_client_id_;
-
-  // Unique per Shiftboss instance.
-  std::uint64_t shiftboss_index_;
-
-  // TMB recipients for all workers managed by this Shiftboss.
-  tmb::Address worker_addresses_;
-
-  // During a single round of WorkOrder dispatch, a Worker should be allocated
-  // at most these many WorkOrders.
-  std::size_t max_msgs_per_worker_;
-
-  // The worker index for scheduling Work Order.
-  std::size_t start_worker_index_;
-
-  // QueryContexts per query.
-  std::unordered_map<std::size_t, std::unique_ptr<QueryContext>> query_contexts_;
-
-  DISALLOW_COPY_AND_ASSIGN(Shiftboss);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ShiftbossDirectory.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ShiftbossDirectory.hpp b/query_execution/ShiftbossDirectory.hpp
deleted file mode 100644
index 88d3039..0000000
--- a/query_execution/ShiftbossDirectory.hpp
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
-
-#include <cstddef>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class which keeps the metadata about the shiftbosses.
- *
- * @note This class is intended to be used only by ForemanDistributed thread.
- *       Therefore, none of the methods in this class are thread-safe.
- **/
-class ShiftbossDirectory {
- public:
-  /**
-   * @brief Constructor.
-  **/
-  ShiftbossDirectory() = default;
-
-  /**
-   * @brief Add the Shiftboss.
-   *
-   * @param shiftboss_id the TMB client ID of Shiftboss thread.
-   * @param capacity The Work Order processing capacity of Shiftboss.
-   **/
-  void addShiftboss(const tmb::client_id shiftboss_id,
-                    const std::size_t capacity) {
-    client_id_indices_.emplace(shiftboss_id, client_ids_.size());
-    client_ids_.push_back(shiftboss_id);
-    work_order_capacities_.push_back(capacity);
-    num_queued_work_orders_.push_back(0u);
-  }
-
-  /**
-   * @brief Whether the ShiftbossDirectory has any Shiftboss.
-   *
-   * @return True if no Shiftboss in ShiftbossDirectory. Otherwise false.
-   **/
-  bool empty() const {
-    DCHECK_EQ(client_ids_.empty(), client_id_indices_.empty());
-    DCHECK_EQ(client_ids_.empty(), work_order_capacities_.empty());
-    DCHECK_EQ(client_ids_.empty(), num_queued_work_orders_.empty());
-
-    return client_ids_.empty();
-  }
-
-  /**
-   * @brief Get the number of Shiftboss in ShiftbossDirectory.
-   *
-   * @return The number of Shiftboss in ShiftbossDirectory.
-   **/
-  std::size_t size() const {
-    DCHECK_EQ(client_ids_.size(), client_id_indices_.size());
-    DCHECK_EQ(client_ids_.size(), work_order_capacities_.size());
-    DCHECK_EQ(client_ids_.size(), num_queued_work_orders_.size());
-
-    return client_ids_.size();
-  }
-
-  /**
-   * @brief Get the TMB client ID of the specified Shiftboss.
-   *
-   * @param shiftboss_index The index of Shiftboss.
-   *
-   * @return The TMB client ID of the given Shiftboss.
-   **/
-  tmb::client_id getClientId(const std::size_t shiftboss_index) const {
-    DCHECK_LT(shiftboss_index, size());
-    return client_ids_[shiftboss_index];
-  }
-
-  /**
-   * @brief Get the Shiftboss index from the specified client id.
-   *
-   * @param shiftboss_id The TMB client ID of Shiftboss.
-   *
-   * @return The index of the given Shiftboss.
-   **/
-  std::size_t getShiftbossIndex(const tmb::client_id shiftboss_id) const {
-    const auto it = client_id_indices_.find(shiftboss_id);
-    DCHECK(it != client_id_indices_.end());
-
-    return it->second;
-  }
-
-  /**
-   * @brief Whether the given Shiftboss has reached its capacity.
-   *
-   * @param shiftboss_index The index of Shiftboss.
-   *
-   * @return True if reached the capacity. Otherwise false.
-   **/
-  bool hasReachedCapacity(const std::size_t shiftboss_index) const {
-    DCHECK_LT(shiftboss_index, size());
-    return num_queued_work_orders_[shiftboss_index] >= work_order_capacities_[shiftboss_index];
-  }
-
-  /**
-   * @brief Add the number of new work orders for the given Shiftboss.
-   *
-   * @param shiftboss_index The index of Shiftboss.
-   * @param num_new_work_orders The number of the new work orders will be
-   *        executed on Shiftboss indexed by 'shiftboss_index'.
-   **/
-  void addNumQueuedWorkOrders(const std::size_t shiftboss_index,
-                              const std::size_t num_new_work_orders) {
-    num_queued_work_orders_[shiftboss_index] += num_new_work_orders;
-  }
-
-  /**
-   * @brief Increase the number of queued workorders for the given Shiftboss by 1.
-   *
-   * @param shiftboss_index The index of Shiftboss.
-   **/
-  void incrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
-    DCHECK_LT(shiftboss_index, size());
-    ++num_queued_work_orders_[shiftboss_index];
-  }
-
-  /**
-   * @brief Decrease the number of queued workorders for the given Shiftboss by 1.
-   *
-   * @param shiftboss_index The index of Shiftboss.
-   **/
-  void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
-    DCHECK_LT(shiftboss_index, size());
-    DCHECK_GE(num_queued_work_orders_[shiftboss_index], 1u);
-    --num_queued_work_orders_[shiftboss_index];
-  }
-
- private:
-  // The TMB client IDs of Shiftbosses.
-  // TODO(zuyu): Support deletions, as Shiftbosses go down.
-  std::vector<tmb::client_id> client_ids_;
-
-  // The map from the TMB client ID of Shiftboss to its index in 'client_ids_'.
-  std::unordered_map<tmb::client_id, std::size_t> client_id_indices_;
-
-  // The max number of WorkOrders per Shiftboss.
-  std::vector<std::size_t> work_order_capacities_;
-
-  // The number of WorkOrders queued for execution per Shiftboss, and the value
-  // should be not greater than that of work_order_capacities_.
-  std::vector<std::size_t> num_queued_work_orders_;
-
-  DISALLOW_COPY_AND_ASSIGN(ShiftbossDirectory);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkOrderProtosContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrderProtosContainer.hpp b/query_execution/WorkOrderProtosContainer.hpp
deleted file mode 100644
index b037332..0000000
--- a/query_execution/WorkOrderProtosContainer.hpp
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKORDER_PROTOS_CONTAINER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKORDER_PROTOS_CONTAINER_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <vector>
-
-#include "relational_operators/WorkOrder.pb.h"  // IWYU pragma: keep
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
- /**
-  * @brief A container used in the distributed version to hold the normal
-  *        (non-rebuild) WorkOrder protos for a given query.
-  *
-  * @note This container stays alive during the lifetime of the query.
-  **/
-class WorkOrderProtosContainer {
- public:
-  /**
-   * @brief Constructor
-   *
-   * @param num_operators Number of operators in the query DAG.
-   **/
-  explicit WorkOrderProtosContainer(const std::size_t num_operators)
-      : num_operators_(num_operators),
-        operator_containers_(num_operators_) {
-    DCHECK_NE(num_operators_, 0u);
-  }
-
-  /**
-   * @brief Destructor.
-   *
-   * @note If the query is executed normally, we should never encounter a
-   *       situation where at the time of deletion the WorkOrderProtosContainer has
-   *       pending WorkOrders.
-   **/
-  ~WorkOrderProtosContainer() {
-    for (std::size_t op = 0; op < num_operators_; ++op) {
-      if (hasWorkOrderProto(op)) {
-        LOG(WARNING) << "Destroying a WorkOrderProtosContainer that still has pending WorkOrder protos.";
-        break;
-      }
-    }
-  }
-
-  /**
-   * @brief Check if there are some pending WorkOrders for the given operator.
-   *
-   * @param operator_index Index of the operator.
-   *
-   * @return If there are pending WorkOrders.
-   **/
-  bool hasWorkOrderProto(const std::size_t operator_index) const {
-    DCHECK_LT(operator_index, num_operators_);
-    return !operator_containers_[operator_index].empty();
-  }
-
-  /**
-   * @brief Get a WorkOrder for a given operator.
-   *
-   * @param operator_index The index of the operator.
-   *
-   * @return Release a WorkOrder proto. If no WorkOrder proto is available,
-   *         return nullptr.
-   **/
-  serialization::WorkOrder* getWorkOrderProto(const std::size_t operator_index) {
-    DCHECK_LT(operator_index, num_operators_);
-
-    if (operator_containers_[operator_index].empty()) {
-      return nullptr;
-    }
-
-    serialization::WorkOrder *proto =
-        operator_containers_[operator_index].front().release();
-    operator_containers_[operator_index].pop();
-
-    return proto;
-  }
-
-  /**
-   * @brief Add a WorkOrder generated from a given
-   *        operator.
-   *
-   * @param workorder A pointer to the WorkOrder to be added.
-   * @param operator_index The index of the operator in the query DAG.
-   **/
-  void addWorkOrderProto(serialization::WorkOrder *proto,
-                         const std::size_t operator_index) {
-    DCHECK(proto != nullptr);
-    DCHECK_LT(operator_index, num_operators_);
-
-    operator_containers_[operator_index].emplace(
-        std::unique_ptr<serialization::WorkOrder>(proto));
-  }
-
-  /**
-   * @brief Get the number of all pending WorkOrders
-   *        for a given operator.
-   *
-   * @param operator_index The index of the operator.
-   *
-   * @return The number of pending WorkOrders.
-   **/
-  std::size_t getNumWorkOrderProtos(const std::size_t operator_index) const {
-    DCHECK_LT(operator_index, num_operators_);
-    return operator_containers_[operator_index].size();
-  }
-
- private:
-  const std::size_t num_operators_;
-
-  std::vector<std::queue<std::unique_ptr<serialization::WorkOrder>>> operator_containers_;
-
-  DISALLOW_COPY_AND_ASSIGN(WorkOrderProtosContainer);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_WORKORDER_PROTOS_CONTAINER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkOrdersContainer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.cpp b/query_execution/WorkOrdersContainer.cpp
deleted file mode 100644
index a16a8e6..0000000
--- a/query_execution/WorkOrdersContainer.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/WorkOrdersContainer.hpp"
-
-#include <algorithm>
-#include <cstddef>
-#include <list>
-#include <memory>
-#include <vector>
-
-#include "relational_operators/WorkOrder.hpp"
-
-#include "glog/logging.h"
-
-using std::unique_ptr;
-
-namespace quickstep {
-
-WorkOrdersContainer::~WorkOrdersContainer() {
-  // For each operator ..
-  for (std::size_t op = 0; op < num_operators_; ++op) {
-    if (hasNormalWorkOrder(op) || hasRebuildWorkOrder(op)) {
-      LOG(WARNING) << "Destroying a WorkOrdersContainer that still has pending WorkOrders.";
-      break;
-    }
-  }
-}
-
-WorkOrder* WorkOrdersContainer::InternalListContainer::getWorkOrderForNUMANode(
-    const int numa_node) {
-  for (std::list<unique_ptr<WorkOrder>>::iterator it = workorders_.begin();
-       it != workorders_.end();
-       ++it) {
-    const std::vector<int> &numa_nodes = (*it)->getPreferredNUMANodes();
-    if (!numa_nodes.empty()) {
-      if (std::find(numa_nodes.begin(), numa_nodes.end(), numa_node) !=
-          numa_nodes.end()) {
-        WorkOrder *work_order = it->release();
-        workorders_.erase(it);
-        return work_order;
-      }
-    }
-  }
-  return nullptr;
-}
-
-void WorkOrdersContainer::OperatorWorkOrdersContainer::addWorkOrder(
-    WorkOrder *workorder) {
-  const std::vector<int> &numa_nodes = workorder->getPreferredNUMANodes();
-  if (!numa_nodes.empty()) {
-    if (numa_nodes.size() == 1) {
-      // This WorkOrder prefers exactly one NUMA node.
-      single_numa_node_workorders_[numa_nodes.front()].addWorkOrder(
-          workorder);
-    } else {
-      // This WorkOrder prefers more than one NUMA node.
-      multiple_numa_nodes_workorders_.addWorkOrder(workorder);
-    }
-  } else {
-    numa_agnostic_workorders_.addWorkOrder(workorder);
-  }
-}
-
-std::size_t
-    WorkOrdersContainer::InternalListContainer::getNumWorkOrdersForNUMANode(
-    const int numa_node) const {
-  std::size_t num_workorders = 0;
-  for (const unique_ptr<WorkOrder> &work_order : workorders_) {
-    const std::vector<int> &numa_nodes = work_order->getPreferredNUMANodes();
-    if (!numa_nodes.empty()) {
-      std::vector<int>::const_iterator
-          it = std::find(numa_nodes.begin(), numa_nodes.end(), numa_node);
-      if (it != numa_nodes.end()) {
-        // Found a match.
-       ++num_workorders;
-      }
-    }
-  }
-  return num_workorders;
-}
-
-bool WorkOrdersContainer::InternalListContainer::hasWorkOrderForNUMANode(
-    const int numa_node) const {
-  for (const unique_ptr<WorkOrder> &work_order : workorders_) {
-    const std::vector<int> &numa_nodes = work_order->getPreferredNUMANodes();
-    if (!numa_nodes.empty()) {
-      std::vector<int>::const_iterator
-          it = std::find(numa_nodes.begin(), numa_nodes.end(), numa_node);
-      if (it != numa_nodes.end()) {
-        // Found a match.
-        return true;
-      }
-    }
-  }
-  return false;
-}
-
-WorkOrder* WorkOrdersContainer::OperatorWorkOrdersContainer::getWorkOrder(
-    const bool prefer_single_NUMA_node) {
-  // This function tries to get any available WorkOrder.
-  WorkOrder *workorder = numa_agnostic_workorders_.getWorkOrder();
-  if (workorder == nullptr) {
-    if (prefer_single_NUMA_node) {
-      workorder = getSingleNUMANodeWorkOrderHelper();
-      if (workorder == nullptr) {
-        workorder = multiple_numa_nodes_workorders_.getWorkOrder();
-      }
-    } else {
-      workorder = multiple_numa_nodes_workorders_.getWorkOrder();
-      if (workorder == nullptr) {
-        workorder = getSingleNUMANodeWorkOrderHelper();
-      }
-    }
-  }
-  return workorder;
-}
-
-WorkOrder* WorkOrdersContainer::OperatorWorkOrdersContainer::
-    getSingleNUMANodeWorkOrderHelper() {
-  WorkOrder *workorder = nullptr;
-  for (PtrVector<InternalQueueContainer>::iterator it =
-           single_numa_node_workorders_.begin();
-       it != single_numa_node_workorders_.end(); ++it) {
-    workorder = it->getWorkOrder();
-    if (workorder != nullptr) {
-      return workorder;
-    }
-  }
-  return nullptr;
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
deleted file mode 100644
index c1739bc..0000000
--- a/query_execution/WorkOrdersContainer.hpp
+++ /dev/null
@@ -1,515 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_
-
-#include <cstddef>
-#include <list>
-#include <memory>
-#include <queue>
-#include <vector>
-
-#include "relational_operators/WorkOrder.hpp"
-#include "utility/Macros.hpp"
-#include "utility/PtrVector.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
- /**
-  * @brief A container to hold the WorkOrders for a given query.
-  * @note This container stays alive during the lifetime of the query.
-  * @note The NUMA node indexing is assumed to start from 0.
-  **/
-class WorkOrdersContainer {
- public:
-  /**
-   * @brief Constructor
-   *
-   * @param num_operators Number of operators in the query DAG.
-   * @param num_numa_nodes Number of NUMA nodes in the system.
-   **/
-  WorkOrdersContainer(const std::size_t num_operators,
-                      const std::size_t num_numa_nodes)
-      : num_operators_(num_operators), num_numa_nodes_(num_numa_nodes) {
-    DEBUG_ASSERT(num_operators != 0);
-    for (std::size_t op = 0; op < num_operators; ++op) {
-      normal_workorders_.push_back(
-          new OperatorWorkOrdersContainer(num_numa_nodes_));
-      rebuild_workorders_.push_back(
-          new OperatorWorkOrdersContainer(num_numa_nodes_));
-    }
-  }
-
-  /**
-   * @brief Destructor.
-   *
-   * @note If the query is executed normally, we should never encounter a
-   *       situation where at the time of deletion the WorkOrdersContainer has
-   *       pending WorkOrders.
-   **/
-  ~WorkOrdersContainer();
-
-  /**
-   * @brief Check if there are some pending WorkOrders for the given operator.
-   *
-   * @param operator_index Index of the operator.
-   *
-   * @return If there are pending WorkOrders.
-   **/
-  inline bool hasNormalWorkOrder(const std::size_t operator_index) const {
-    DCHECK_LT(operator_index, num_operators_);
-    return normal_workorders_[operator_index].hasWorkOrder();
-  }
-
-  /**
-   * @brief Check if there are some pending normal (i.e. non-rebuild) WorkOrders
-   *        for the given operator which prefer the specified NUMA node.
-   *
-   * @param operator_index Index of the operator.
-   * @param numa_node_id The NUMA node.
-   *
-   * @return If there are pending WorkOrders for the given operator which prefer
-   *         numa_node_id.
-   **/
-  inline bool hasNormalWorkOrderForNUMANode(
-      const std::size_t operator_index, const int numa_node_id) const {
-    DCHECK_LT(operator_index, num_operators_);
-    DCHECK_GE(numa_node_id, 0);
-    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
-    return normal_workorders_[operator_index].hasWorkOrderForNUMANode(
-        numa_node_id);
-  }
-
-  /**
-   * @brief Check if there are some pending rebuild WorkOrders for the given
-   *        operator.
-   *
-   * @param operator_index Index of the operator.
-   *
-   * @return If there are pending rebuild WorkOrders.
-   **/
-  inline bool hasRebuildWorkOrder(const std::size_t operator_index) const {
-    DCHECK_LT(operator_index, num_operators_);
-    return rebuild_workorders_[operator_index].hasWorkOrder();
-  }
-
-  /**
-   * @brief Check if there are some pending rebuild WorkOrders for the given
-   *        operator which prefer the specified NUMA node.
-   *
-   * @param operator_index Index of the operator.
-   * @param numa_node_id The NUMA node.
-   *
-   * @return If there are pending rebuild WorkOrders for the given operator which
-   *         prefer numa_node_id.
-   **/
-  inline bool hasRebuildWorkOrderForNUMANode(
-      const std::size_t operator_index, const int numa_node_id) const {
-    DCHECK_LT(operator_index, num_operators_);
-    DCHECK_GE(numa_node_id, 0);
-    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
-    return rebuild_workorders_[operator_index].hasWorkOrderForNUMANode(
-        numa_node_id);
-  }
-
-  /**
-   * @brief Get a normal (non-rebuild) WorkOrder for a given operator which
-   *        prefer the given NUMA node.
-   *
-   * @param operator_index The index of the operator.
-   * @param numa_node_id The NUMA node.
-   *
-   * @return A WorkOrder which prefers numa_node_id. If no such WorkOrder is
-   *         available, return nullptr. The caller is responsible for taking the
-   *         ownership.
-   **/
-  WorkOrder* getNormalWorkOrderForNUMANode(const std::size_t operator_index,
-                                           const int numa_node_id) {
-    DCHECK_LT(operator_index, num_operators_);
-    DCHECK_GE(numa_node_id, 0);
-    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
-    return normal_workorders_[operator_index].getWorkOrderForNUMANode(
-        numa_node_id);
-  }
-
-  /**
-   * @brief Get a normal (non-rebuild) WorkOrder for a given operator.
-   *
-   * @param operator_index The index of the operator.
-   * @param prefer_single_NUMA_node If true, first try to get workorders which
-   *        prefer exactly one NUMA node over workorders which list more than
-   *        one NUMA node as their preference.
-   *
-   * @return A WorkOrder. If no WorkOrder is available, returns nullptr. The
-   *         caller is responsible for taking the ownership.
-   **/
-  WorkOrder* getNormalWorkOrder(const std::size_t operator_index,
-                                const bool prefer_single_NUMA_node = true) {
-    DCHECK_LT(operator_index, num_operators_);
-    return normal_workorders_[operator_index].getWorkOrder(
-        prefer_single_NUMA_node);
-  }
-
-  /**
-   * @brief Get a rebuild WorkOrder for a given operator whch prefer the
-   *        specified NUMA node.
-   *
-   * @param operator_index The index of the operator.
-   * @param numa_node_id The NUMA node.
-   *
-   * @return A WorkOrder that prefers numa_node_id. If no such WorkOrder is
-   *         available, return nullptr. The caller is responsible for taking the
-   *         ownership.
-   **/
-  WorkOrder* getRebuildWorkOrderForNUMANode(const std::size_t operator_index,
-                                            const int numa_node_id) {
-    DCHECK_LT(operator_index, num_operators_);
-    DCHECK_GE(numa_node_id, 0);
-    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
-    return rebuild_workorders_[operator_index].getWorkOrderForNUMANode(
-        numa_node_id);
-  }
-
-  /**
-   * @brief Get a rebuild WorkOrder for the given operator
-   *
-   * @param operator_index The index of the operator.
-   * @param prefer_single_NUMA_node If true, first try to get workorders which
-   *        prefer exactly one NUMA node over workorders which list more than
-   *        one NUMA node as their preference.
-   *
-   * @return A WorkOrder. If no WorkOrder is available, returns nullptr. The
-   *         caller is responsible for taking the ownership.
-   **/
-  WorkOrder* getRebuildWorkOrder(const std::size_t operator_index,
-                                 const bool prefer_single_NUMA_node = true) {
-    DCHECK_LT(operator_index, num_operators_);
-    return rebuild_workorders_[operator_index].getWorkOrder(
-        prefer_single_NUMA_node);
-  }
-
-  /**
-   * @brief Add a normal (non-rebuild) WorkOrder generated from a given
-   *        operator.
-   *
-   * @note Take the ownership of \p workorder.
-   * @note The workorder to be added contains information about its preferred
-   *       NUMA nodes. This information is used to insert the WorkOrder
-   *       appropriately.
-   *
-   * @param workorder A pointer to the WorkOrder to be added.
-   * @param operator_index The index of the operator in the query DAG.
-   **/
-  void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
-    DCHECK(workorder != nullptr);
-    DCHECK_LT(operator_index, num_operators_);
-    normal_workorders_[operator_index].addWorkOrder(workorder);
-  }
-
-  /**
-   * @brief Add a rebuild WorkOrder generated from a given operator.
-   *
-   * @note Take the ownership of \p workorder.
-   * @note The workorder to be added contains information about its preferred
-   *       NUMA nodes. This information is used to insert the WorkOrder
-   *       appropriately.
-   *
-   * @param workorder A pointer to the WorkOrder to be added.
-   * @param operator_index The index of the operator in the query DAG.
-   **/
-  void addRebuildWorkOrder(WorkOrder *workorder,
-                           const std::size_t operator_index) {
-    DCHECK(workorder != nullptr);
-    DCHECK_LT(operator_index, num_operators_);
-    rebuild_workorders_[operator_index].addWorkOrder(workorder);
-  }
-
-  /**
-   * @brief Get the number of pending normal WorkOrders for a given operator
-   *        which prefer the specified NUMA node.
-   *
-   * @param operator_index The index of the operator.
-   * @param numa_node_id The NUMA node.
-   *
-   * @return The number of pending WorkOrders which prefer numa_node_id.
-   **/
-  inline std::size_t getNumNormalWorkOrdersForNUMANode(
-      const std::size_t operator_index, const int numa_node_id) const {
-    DCHECK_LT(operator_index, num_operators_);
-    DCHECK_GE(numa_node_id, 0);
-    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
-    return normal_workorders_[operator_index].getNumWorkOrdersForNUMANode(
-        numa_node_id);
-  }
-
-  /**
-   * @brief Get the number of all pending normal (i.e. non-rebuild) WorkOrders
-   *        for a given operator.
-   *
-   * @param operator_index The index of the operator.
-   *
-   * @return The number of pending normal WorkOrders.
-   **/
-  inline std::size_t getNumNormalWorkOrders(
-      const std::size_t operator_index) const {
-    DCHECK_LT(operator_index, num_operators_);
-    return normal_workorders_[operator_index].getNumWorkOrders();
-  }
-
-  /**
-   * @brief Get the number of pending rebuild WorkOrders for a given operator
-   *        which prefer the specified NUMA node.
-   *
-   * @param operator_index The index of the operator.
-   * @param numa_node_id The NUMA node.
-   *
-   * @return The number of pending WorkOrders which prefer numa_node_id.
-   **/
-  inline std::size_t getNumRebuildWorkOrdersForNUMANode(
-      const std::size_t operator_index, const int numa_node_id) const {
-    DCHECK_LT(operator_index, num_operators_);
-    DCHECK_GE(numa_node_id, 0);
-    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
-    return rebuild_workorders_[operator_index].getNumWorkOrdersForNUMANode(
-        numa_node_id);
-  }
-
-  /**
-   * @brief Get the number of all pending rebuild WorkOrders for a given
-   *        operator.
-   *
-   * @param operator_index The index of the operator.
-   *
-   * @return The number of pending rebuild WorkOrders.
-   **/
-  inline std::size_t getNumRebuildWorkOrders(
-      const std::size_t operator_index) const {
-    DCHECK_LT(operator_index, num_operators_);
-    return rebuild_workorders_[operator_index].getNumWorkOrders();
-  }
-
- private:
-  /**
-   * @brief An internal queue-based container structure to hold the WorkOrders.
-   **/
-  class InternalQueueContainer {
-   public:
-    InternalQueueContainer() {
-    }
-
-    inline void addWorkOrder(WorkOrder *workorder) {
-      workorders_.emplace(std::unique_ptr<WorkOrder>(workorder));
-    }
-
-    inline WorkOrder* getWorkOrder() {
-      if (workorders_.empty()) {
-        return nullptr;
-      }
-
-      WorkOrder *work_order = workorders_.front().release();
-      workorders_.pop();
-      return work_order;
-    }
-
-    inline bool hasWorkOrder() const {
-      return !workorders_.empty();
-    }
-
-    inline std::size_t getNumWorkOrders() const {
-      return workorders_.size();
-    }
-
-   private:
-    std::queue<std::unique_ptr<WorkOrder>> workorders_;
-
-    DISALLOW_COPY_AND_ASSIGN(InternalQueueContainer);
-  };
-
-  /**
-   * @brief An internal list-based container structure to hold the WorkOrders.
-   *
-   * @note We use this class for WorkOrders whose inputs come from multiple NUMA
-   *       nodes e.g. a HashJoinWorkOrder whose probe block and the hash table
-   *       may reside on two different NUMA nodes.
-   **/
-  class InternalListContainer {
-   public:
-    InternalListContainer() {
-    }
-
-    inline void addWorkOrder(WorkOrder *workorder) {
-      workorders_.emplace_back(std::unique_ptr<WorkOrder>(workorder));
-    }
-
-    inline WorkOrder* getWorkOrder() {
-      if (workorders_.empty()) {
-        return nullptr;
-      }
-
-      WorkOrder *work_order = workorders_.front().release();
-      workorders_.pop_front();
-      return work_order;
-    }
-
-    /**
-     * @note This method has O(N) complexity.
-     **/
-    WorkOrder* getWorkOrderForNUMANode(const int numa_node);
-
-    inline bool hasWorkOrder() const {
-      return !workorders_.empty();
-    }
-
-    /**
-     * @brief Check if numa_node is in the set of preferred NUMA nodes for at
-     *        least one WorkOrder in this container.
-     *
-     * @note This method has O(N) complexity.
-     **/
-    bool hasWorkOrderForNUMANode(const int numa_node) const;
-
-    inline std::size_t getNumWorkOrders() const {
-      return workorders_.size();
-    }
-
-    /**
-     * @brief Return the number of WorkOrders that list the numa_node as one of
-     *        the preferred nodes of execution.
-     **/
-    std::size_t getNumWorkOrdersForNUMANode(const int numa_node) const;
-
-   private:
-    std::list<std::unique_ptr<WorkOrder>> workorders_;
-
-    DISALLOW_COPY_AND_ASSIGN(InternalListContainer);
-  };
-
-  /**
-   * @brief A container to hold all the WorkOrders generated by one operator.
-   **/
-  class OperatorWorkOrdersContainer {
-   public:
-    explicit OperatorWorkOrdersContainer(const std::size_t num_numa_nodes)
-        : num_numa_nodes_(num_numa_nodes) {
-      for (std::size_t numa_node = 0; numa_node < num_numa_nodes; ++numa_node) {
-        single_numa_node_workorders_.push_back(new InternalQueueContainer());
-      }
-    }
-
-    void addWorkOrder(WorkOrder *workorder);
-
-    bool hasWorkOrderForNUMANode(const int numa_node_id) const {
-      DCHECK_GE(numa_node_id, 0);
-      DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
-      return single_numa_node_workorders_[numa_node_id].hasWorkOrder() ||
-             multiple_numa_nodes_workorders_.hasWorkOrderForNUMANode(
-                 numa_node_id);
-    }
-
-    bool hasWorkOrder() const {
-      if (!(numa_agnostic_workorders_.hasWorkOrder() ||
-            multiple_numa_nodes_workorders_.hasWorkOrder())) {
-        for (std::size_t i = 0; i < num_numa_nodes_; ++i) {
-          if (hasWorkOrderForNUMANode(i)) {
-            return true;
-          }
-        }
-        return false;
-      }
-      return true;
-    }
-
-    std::size_t getNumWorkOrdersForNUMANode(
-        const int numa_node_id) const {
-      DCHECK_GE(numa_node_id, 0);
-      DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
-      return single_numa_node_workorders_[numa_node_id].getNumWorkOrders() +
-             multiple_numa_nodes_workorders_.getNumWorkOrdersForNUMANode(
-                 numa_node_id);
-    }
-
-    inline std::size_t getNumWorkOrders() const {
-      std::size_t num_workorders = numa_agnostic_workorders_.getNumWorkOrders();
-      // for each NUMA node involved ..
-      for (PtrVector<InternalQueueContainer>::const_iterator it =
-               single_numa_node_workorders_.begin();
-           it != single_numa_node_workorders_.end();
-           ++it) {
-        // Add the number of workorders for the NUMA node.
-        num_workorders += it->getNumWorkOrders();
-      }
-      // Add the number of workorders who have multiple NUMA nodes as input.
-      num_workorders += multiple_numa_nodes_workorders_.getNumWorkOrders();
-      return num_workorders;
-    }
-
-    WorkOrder* getWorkOrderForNUMANode(const int numa_node_id) {
-      DCHECK_GE(numa_node_id, 0);
-      DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
-      WorkOrder *work_order = single_numa_node_workorders_[numa_node_id].getWorkOrder();
-      if (work_order == nullptr) {
-        work_order = multiple_numa_nodes_workorders_.getWorkOrderForNUMANode(
-            numa_node_id);
-      }
-      return work_order;
-    }
-
-    WorkOrder* getWorkOrder(const bool prefer_single_NUMA_node = true);
-
-   private:
-    WorkOrder* getSingleNUMANodeWorkOrderHelper();
-
-    const std::size_t num_numa_nodes_;
-
-    // A container to store NUMA agnostic workorders.
-    InternalQueueContainer numa_agnostic_workorders_;
-
-    // A vector of containers to store workorders which prefer exactly one NUMA
-    // node for execution.
-    PtrVector<InternalQueueContainer> single_numa_node_workorders_;
-
-    // A container to store workorders which prefer more than one NUMA node for
-    // execution.
-    InternalListContainer multiple_numa_nodes_workorders_;
-    // Note that no workorder should be shared among the *_workorders_ structures.
-
-    DISALLOW_COPY_AND_ASSIGN(OperatorWorkOrdersContainer);
-  };
-
-  const std::size_t num_operators_;
-  const std::size_t num_numa_nodes_;
-
-  PtrVector<OperatorWorkOrdersContainer> normal_workorders_;
-  PtrVector<OperatorWorkOrdersContainer> rebuild_workorders_;
-
-  DISALLOW_COPY_AND_ASSIGN(WorkOrdersContainer);
-};
-/** @} */
-
-}  // namespace quickstep
-
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
deleted file mode 100644
index 31eb964..0000000
--- a/query_execution/Worker.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/Worker.hpp"
-
-#include <chrono>
-#include <cstddef>
-#include <cstdint>
-#include <cstdlib>
-#include <utility>
-
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
-#include "relational_operators/WorkOrder.hpp"
-#include "threading/ThreadIDBasedMap.hpp"
-#include "threading/ThreadUtil.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::size_t;
-using std::uint64_t;
-
-using tmb::TaggedMessage;
-
-namespace quickstep {
-
-using serialization::WorkOrderCompletionMessage;
-
-void Worker::run() {
-  if (cpu_id_ >= 0) {
-    ThreadUtil::BindToCPU(cpu_id_);
-  }
-  ClientIDMap *thread_id_map = ClientIDMap::Instance();
-  thread_id_map->addValue(worker_client_id_);
-  for (;;) {
-    // Receive() is a blocking call, causing this thread to sleep until next
-    // message is received.
-    const AnnotatedMessage annotated_msg =
-        bus_->Receive(worker_client_id_, 0, true);
-    const TaggedMessage &tagged_message = annotated_msg.tagged_message;
-    switch (tagged_message.message_type()) {
-#ifdef QUICKSTEP_DISTRIBUTED
-      case kShiftbossRegistrationResponseMessage: {
-        serialization::ShiftbossRegistrationResponseMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        shiftboss_index_ = proto.shiftboss_index();
-        break;
-      }
-#endif  // QUICKSTEP_DISTRIBUTED
-      case kWorkOrderMessage: {
-        WorkOrderCompletionMessage proto;
-        executeWorkOrderHelper(tagged_message, &proto);
-        sendWorkOrderCompleteMessage(
-            annotated_msg.sender, proto, kWorkOrderCompleteMessage);
-        break;
-      }
-      case kRebuildWorkOrderMessage: {
-        WorkOrderCompletionMessage proto;
-        executeWorkOrderHelper(tagged_message, &proto, true /* is_rebuild */);
-        sendWorkOrderCompleteMessage(
-            annotated_msg.sender, proto, kRebuildWorkOrderCompleteMessage);
-        break;
-      }
-      case kPoisonMessage: {
-        // Remove the entry from the thread ID based map for this worker thread.
-        thread_id_map->removeValue();
-        return;
-      }
-    }
-  }
-}
-
-void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
-                                          const WorkOrderCompletionMessage &proto,
-                                          const message_type_id message_type) {
-  // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char *>(std::malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage tagged_message(
-      static_cast<const void *>(proto_bytes), proto_length, message_type);
-  std::free(proto_bytes);
-
-  DLOG(INFO) << "Worker sent WorkOrderCompleteMessage (typed '" << message_type
-             << "') to Scheduler with TMB client ID " << receiver;
-  const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(
-          bus_, worker_client_id_, receiver, std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-}
-
-void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
-                                    WorkOrderCompletionMessage *proto,
-                                    const bool is_rebuild_work_order) {
-  std::chrono::time_point<std::chrono::steady_clock> start, end;
-  WorkerMessage worker_message(
-      *static_cast<const WorkerMessage *>(tagged_message.message()));
-  DCHECK(worker_message.getWorkOrder() != nullptr);
-  const size_t query_id_for_workorder = worker_message.getWorkOrder()->getQueryID();
-
-  // Start measuring the execution time.
-  start = std::chrono::steady_clock::now();
-  worker_message.getWorkOrder()->execute();
-  end = std::chrono::steady_clock::now();
-  delete worker_message.getWorkOrder();
-
-  // Convert the measured timestamps to epoch times in microseconds.
-  const uint64_t execution_start_time =
-      std::chrono::duration_cast<std::chrono::microseconds>(
-          start.time_since_epoch()).count();
-  const uint64_t execution_end_time =
-      std::chrono::duration_cast<std::chrono::microseconds>(
-          end.time_since_epoch()).count();
-
-  // Construct the proto message.
-  proto->set_work_order_type(is_rebuild_work_order ? WorkOrderCompletionMessage::REBUILD
-                                                   : WorkOrderCompletionMessage::NORMAL);
-  proto->set_operator_index(worker_message.getRelationalOpIndex());
-  proto->set_query_id(query_id_for_workorder);
-  proto->set_worker_thread_index(worker_thread_index_);
-  proto->set_execution_start_time(execution_start_time);
-  proto->set_execution_end_time(execution_end_time);
-
-#ifdef QUICKSTEP_DISTRIBUTED
-  proto->set_shiftboss_index(shiftboss_index_);
-#endif  // QUICKSTEP_DISTRIBUTED
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
deleted file mode 100644
index feb7c7d..0000000
--- a/query_execution/Worker.hpp
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_
-
-#include <cstddef>
-#include <cstdint>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
-#include "threading/Thread.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace tmb { class TaggedMessge; }
-
-namespace quickstep {
-
-namespace serialization { class WorkOrderCompletionMessage; }
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
- /**
-  * @brief A worker thread executes WorkOrders which it receives from Foreman.
-  **/
-class Worker : public Thread {
- public:
-  /**
-   * @brief Constructor
-   *
-   * @param worker_thread_index The unique index of this worker thread in
-   *        WorkerDirectory.
-   * @param bus A pointer to the TMB.
-   * @param cpu_id The ID of the CPU to which this worker thread can be pinned.
-   *
-   * @note If cpu_id is not specified, worker thread can be possibly moved
-   *       around on different CPUs by the OS.
-   **/
-  Worker(const std::size_t worker_thread_index,
-         MessageBus *bus,
-         int cpu_id = -1)
-      : worker_thread_index_(worker_thread_index),
-        bus_(bus),
-        cpu_id_(cpu_id) {
-    DEBUG_ASSERT(bus_ != nullptr);
-    worker_client_id_ = bus_->Connect();
-
-    bus_->RegisterClientAsSender(worker_client_id_, kWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(worker_client_id_,
-                                 kRebuildWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(worker_client_id_, kCatalogRelationNewBlockMessage);
-    bus_->RegisterClientAsSender(worker_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(worker_client_id_, kWorkOrderFeedbackMessage);
-
-    bus_->RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);
-    bus_->RegisterClientAsReceiver(worker_client_id_, kRebuildWorkOrderMessage);
-    bus_->RegisterClientAsReceiver(worker_client_id_, kPoisonMessage);
-
-#ifdef QUICKSTEP_DISTRIBUTED
-    bus_->RegisterClientAsReceiver(worker_client_id_, kShiftbossRegistrationResponseMessage);
-#endif  // QUICKSTEP_DISTRIBUTED
-  }
-
-  ~Worker() override {}
-
-  /**
-   * @brief Get the TMB client ID of this worker.
-   *
-   * @return TMB client ID of this worker.
-   **/
-  inline client_id getBusClientID() const {
-    return worker_client_id_;
-  }
-
- protected:
-  /**
-   * @brief Receive workorder one at a time and execute it. After execution, send
-   *        a completion message to the Foreman and delete the WorkOrder.
-   *
-   * @note  On receiving a poison message, the method terminates.
-   **/
-  void run() override;
-
- private:
-  /**
-   * @brief A helper method to execute the WorkOrder and construct a
-   *        completion message.
-   *
-   * @note Right now a single helper method works for all message types.
-   *       If different message types need to collect different statistics for
-   *       the WorkOrder execution, we need to create different helper methods,
-   *       one for each message type.
-   *
-   * @param tagged_message The TaggedMessage which consists of the WorkOrder.
-   * @param proto The proto message to be sent.
-   * @param is_rebuild_work_order Whether it is used for a RebuildWorkOrder.
-   **/
-  void executeWorkOrderHelper(const TaggedMessage &tagged_message,
-                              serialization::WorkOrderCompletionMessage *proto,
-                              const bool is_rebuild_work_order = false);
-
-  /**
-   * @brief A helper method to send the WorkOrder completion message.
-   *
-   * @param receiver The TMB client ID of the receiver.
-   * @param proto The proto message to be sent.
-   * @param message_type The ID of the type of the message being sent.
-   **/
-  void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
-                                    const serialization::WorkOrderCompletionMessage &proto,
-                                    const message_type_id message_type);
-
-  const std::size_t worker_thread_index_;
-  MessageBus *bus_;
-
-  const int cpu_id_;
-  client_id worker_client_id_;
-
-#ifdef QUICKSTEP_DISTRIBUTED
-  std::size_t shiftboss_index_;
-#endif  // QUICKSTEP_DISTRIBUTED
-
-  DISALLOW_COPY_AND_ASSIGN(Worker);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkerDirectory.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerDirectory.hpp b/query_execution/WorkerDirectory.hpp
deleted file mode 100644
index be5cfc1..0000000
--- a/query_execution/WorkerDirectory.hpp
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
-
-#include <algorithm>
-#include <cstddef>
-#include <iterator>
-#include <utility>
-#include <vector>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "utility/Macros.hpp"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-/**
- * @brief A class which keeps the metadata about the workers.
- *
- * @note The number of workorders per worker used in this class are as viewed by
- *       Foreman. An alternative implementation can use TMB to query the queue
- *       lengths of each worker.
- *
- * @note This class is intended to be used only by Foreman thread. Therefore
- *       none of the methods in this class are thread-safe.
- **/
-class WorkerDirectory {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param num_workers The number of workers.
-   * @param client_ids A vector of TMB client IDs for the workers.
-   * @param numa_node_ids A vector of NUMA node IDs where the workers are
-   *        pinned. If a worker is not pinned to any NUMA node, the NUMA node ID
-   *        for that worker is -1.
-   **/
-  WorkerDirectory(const std::size_t num_workers,
-                  const std::vector<client_id> &client_ids,
-                  const std::vector<int> &numa_node_ids)
-    : num_workers_(num_workers),
-      num_queued_workorders_(num_workers, 0),
-      numa_node_ids_(numa_node_ids),
-      client_ids_(client_ids) {
-    DEBUG_ASSERT(num_workers > 0);
-    DEBUG_ASSERT(client_ids.size() == num_workers);
-    DEBUG_ASSERT(numa_node_ids.size() == num_workers);
-  }
-
-  /**
-   * @brief Get the number of worker threads.
-   *
-   * @return The number of worker threads at the time the function is called.
-   **/
-  inline const std::size_t getNumWorkers() const {
-    return num_workers_;
-  }
-
-  /**
-   * @brief Get the number of queued workorders for the given worker.
-   *
-   * @note The queued number of workorders consist of the workorders waiting for
-   *       execution and the workorder being executed by the specified worker
-   *       at the time this function is called.
-   *
-   * @param worker_thread_index The logical ID of the given worker.
-   *
-   * @return The number of queued workorders.
-   **/
-  inline const std::size_t getNumQueuedWorkOrders(
-      const std::size_t worker_thread_index) const {
-    DEBUG_ASSERT(worker_thread_index < num_workers_);
-    return num_queued_workorders_[worker_thread_index];
-  }
-
-  /**
-   * @brief Increment the number of queued workorders for the given worker by 1.
-   *
-   * @param worker_thread_index The logical ID of the given worker.
-   **/
-  inline void incrementNumQueuedWorkOrders(const std::size_t worker_thread_index) {
-    DEBUG_ASSERT(worker_thread_index < num_workers_);
-    ++num_queued_workorders_[worker_thread_index];
-  }
-
-  /**
-   * @brief Decrement the number of queued workorders for the given worker by 1.
-   *
-   * @param worker_thread_index The logical ID of the given worker.
-   **/
-  inline void decrementNumQueuedWorkOrders(const std::size_t worker_thread_index) {
-    DEBUG_ASSERT(worker_thread_index < num_workers_);
-    DEBUG_ASSERT(num_queued_workorders_[worker_thread_index] >= 1);
-    --num_queued_workorders_[worker_thread_index];
-  }
-
-  /**
-   * @brief Get the NUMA node where the specified worker is pinned to.
-   *
-   * @param worker_thread_index The logical ID of the given worker.
-   *
-   * @return The NUMA node ID where the given worker is pinned. If the worker
-   *         hasn't been pinned to any NUMA node, this value is -1.
-   **/
-  inline int getNUMANode(const std::size_t worker_thread_index) const {
-    DEBUG_ASSERT(worker_thread_index < num_workers_);
-    return numa_node_ids_[worker_thread_index];
-  }
-
-  /**
-   * @brief Get the TMB client ID of the specified worker.
-   *
-   * @param worker_thread_index The logical ID of the given worker.
-   *
-   * @return The TMB client ID of the given worker.
-   **/
-  inline const client_id getClientID(const std::size_t worker_thread_index) const {
-    DEBUG_ASSERT(worker_thread_index < num_workers_);
-    return client_ids_[worker_thread_index];
-  }
-
-  /**
-   * @brief Generate address of a worker.
-   *
-   * @param worker_thread_index The logical ID of the given worker.
-   *
-   * @return TMB Address of the given worker.
-   **/
-  inline Address getWorkerAddress(std::size_t worker_thread_index) const {
-    DEBUG_ASSERT(worker_thread_index < num_workers_);
-    Address worker_address;
-    worker_address.AddRecipient(client_ids_[worker_thread_index]);
-    return worker_address;
-  }
-
-  /**
-   * @brief Add metadata about a new worker.
-   *
-   * @param cid The TMB client ID of the new worker.
-   * @param numa_node_id The NUMA node ID where the new worker is pinned.
-   **/
-  inline void addWorker(const client_id cid, const int numa_node_id) {
-    ++num_workers_;
-    num_queued_workorders_.push_back(0);
-    numa_node_ids_.push_back(numa_node_id);
-    client_ids_.push_back(cid);
-  }
-
-  /**
-   * @brief Get information about the least loaded worker at the time the
-   *        function is called.
-   *
-   * TODO(harshad) This method performs a linear search. Using a dynamic
-   *               priority queue to track worker loads can help this method
-   *               execute faster.
-   * TODO(harshad) Extend this method to find least loaded workers on a given
-   *               NUMA node.
-   *
-   * @note If there are multiple workers with the least amount of load, the
-   *       worker with the smaller logical ID is returned.
-   *
-   * @return A pair: First element is the logical ID of the least loaded worker
-   *         and second element is the number of queued workorders for this
-   *         worker at the time the function is called.
-   **/
-  std::pair<std::size_t, std::size_t> getLeastLoadedWorker() const {
-    std::vector<std::size_t>::const_iterator min_element_iter =
-        std::min_element(std::begin(num_queued_workorders_),
-                         std::end(num_queued_workorders_));
-    DEBUG_ASSERT(min_element_iter != num_queued_workorders_.end());
-    const std::size_t least_loaded_worker_thread_index =
-        std::distance(num_queued_workorders_.begin(), min_element_iter);
-    return std::make_pair(least_loaded_worker_thread_index, *min_element_iter);
-  }
-
-  /**
-   * @brief Get information about the most loaded worker at the time the
-   *        function is called.
-   *
-   * TODO(harshad) This method performs a linear search. Using a dynamic
-   *               priority queue to track worker loads can help this method
-   *               execute faster.
-   * TODO(harshad) Extend this method to find most loaded workers on a given
-   *               NUMA node.
-   *
-   * @note If there are multiple workers with the maximum amount of load, the
-   *       worker with the smaller logical ID is returned.
-   *
-   * @return A pair: First element is the logical ID of the most loaded worker
-   *         and second element is the number of queued workorders for this
-   *         worker at the time the function is called.
-   **/
-  std::pair<std::size_t, std::size_t> getMostLoadedWorker() const {
-    std::vector<std::size_t>::const_iterator max_element_iter =
-        std::max_element(std::begin(num_queued_workorders_),
-                         std::end(num_queued_workorders_));
-    DEBUG_ASSERT(max_element_iter != num_queued_workorders_.end());
-    const std::size_t most_loaded_worker_thread_index =
-        std::distance(num_queued_workorders_.begin(), max_element_iter);
-    return std::make_pair(most_loaded_worker_thread_index, *max_element_iter);
-  }
-
- private:
-  std::size_t num_workers_;
-
-  // The number of WorkOrders queued for execution per worker.
-  std::vector<std::size_t> num_queued_workorders_;
-
-  // The NUMA node IDs where the workers are pinned to. If a worker is not
-  // pinned, the ID is -1.
-  std::vector<int> numa_node_ids_;
-
-  // The vector of client IDs
-  std::vector<client_id> client_ids_;
-
-  DISALLOW_COPY_AND_ASSIGN(WorkerDirectory);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkerMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp
deleted file mode 100644
index bd84ac6..0000000
--- a/query_execution/WorkerMessage.hpp
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_MESSAGE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKER_MESSAGE_HPP_
-
-#include <cstddef>
-
-namespace quickstep {
-
-class WorkOrder;
-/**
- * @brief The messages to be sent to the worker from Foreman.
- *
- * @note  This class is copyable.
- **/
-class WorkerMessage {
- public:
-  static constexpr int kInvalidRecipientIndexHint = -1;
-
-  enum class WorkerMessageType {
-    kRebuildWorkOrder = 0,
-    kWorkOrder,
-  };
-
-  /**
-   * @brief A static factory method for generating rebuild WorkOrder messages.
-   *
-   * @param rebuild_workorder The rebuild WorkOrder to be executed by the worker.
-   * @param relational_op_index The index of the relational operator in the
-   *        query plan DAG that generated the given rebuild WorkOrder.
-   *
-   * @return The constructed RebuildWorkOrderMessage.
-   **/
-  static WorkerMessage* RebuildWorkOrderMessage(WorkOrder *rebuild_workorder, const std::size_t relational_op_index) {
-    return new WorkerMessage(rebuild_workorder,
-                             relational_op_index,
-                             WorkerMessageType::kRebuildWorkOrder);
-  }
-
-  /**
-   * @brief A static factory method for generating WorkOrder messages.
-   *
-   * @param workorder The work order to be executed by the worker.
-   * @param relational_op_index The index of the relational operator in the
-   *                            query plan DAG that generated the given
-   *                            workorder.
-   *
-   * @return The constructed WorkOrderMessage.
-   **/
-  static WorkerMessage* WorkOrderMessage(WorkOrder *workorder, const std::size_t relational_op_index) {
-    return new WorkerMessage(workorder,
-                             relational_op_index,
-                             WorkerMessageType::kWorkOrder);
-  }
-
-  /**
-   * @brief Destructor.
-   **/
-  ~WorkerMessage() {
-  }
-
-  /**
-   * @brief Gets the work order to be executed by the worker.
-   * @return A pointer to the work order which the worker should execute.
-   **/
-  inline WorkOrder* getWorkOrder() const {
-    return work_unit_;
-  }
-
-  /**
-   * @brief Get the index of the relational operator in the DAG that generated
-   *        the workorder.
-   **/
-  inline std::size_t getRelationalOpIndex() const {
-    return relational_op_index_;
-  }
-
-  /**
-   * @brief Get the type of this WorkerMessage.
-   **/
-  inline WorkerMessageType getType() const {
-    return type_;
-  }
-
-  /**
-   * @brief Set a hint for the recipient worker thread.
-   *
-   * @param recipient_index_hint The hint i.e. the worker thread index.
-   **/
-  inline void setRecipientHint(const int recipient_index_hint) {
-    recipient_index_hint_ = recipient_index_hint;
-  }
-
-  /**
-   * @brief Get the hint for the recipient worker thread. The hint is invalid if
-   *        it is kInvalidRecipientIndexHint.
-   **/
-  inline int getRecipientHint() const {
-    return recipient_index_hint_;
-  }
-
- private:
-  /**
-   * @brief Constructor.
-   *
-   * @param work_unit The work order to be executed by the worker.
-   * @param relational_op_index The index of the relational operator in the
-   *        query plan DAG that generated the given WorkOrder.
-   * @param type Type of the WorkerMessage.
-   **/
-  WorkerMessage(WorkOrder *work_unit,
-                const std::size_t relational_op_index,
-                const WorkerMessageType type)
-      : work_unit_(work_unit),
-        relational_op_index_(relational_op_index),
-        type_(type),
-        recipient_index_hint_(kInvalidRecipientIndexHint) {}
-
-  WorkOrder *work_unit_;
-  const std::size_t relational_op_index_;
-  const WorkerMessageType type_;
-  int recipient_index_hint_;
-};
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_WORKER_MESSAGE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkerSelectionPolicy.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerSelectionPolicy.hpp b/query_execution/WorkerSelectionPolicy.hpp
deleted file mode 100644
index 4f4c6eb..0000000
--- a/query_execution/WorkerSelectionPolicy.hpp
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_SELECTION_POLICY_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKER_SELECTION_POLICY_HPP_
-
-#include <cstddef>
-#include <random>
-#include <utility>
-
-#include "query_execution/WorkerDirectory.hpp"
-#include "utility/Macros.hpp"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief Base class for a policy to select workers for workorder execution.
- **/
-class WorkerSelectionPolicy {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param worker_directory The worker directory.
-   **/
-  explicit WorkerSelectionPolicy(const WorkerDirectory &worker_directory)
-      : worker_directory_(worker_directory) {
-    if (worker_directory.getNumWorkers() == 0) {
-      FATAL_ERROR("WorkerSelectionPolicy needs at least one worker");
-    }
-  }
-
-  /**
-   * @brief Choose the worker for next workorder execution based on the policy.
-   *
-   * @return The logical ID of the worker chosen for next workorder execution.
-   **/
-  virtual std::size_t getNextWorkerID() = 0;
-
- protected:
-  const WorkerDirectory &worker_directory_;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(WorkerSelectionPolicy);
-};
-
-/**
- * @brief Choose the next worker in a round robin manner.
- **/
-class RoundRobinWorkerSelectionPolicy : public WorkerSelectionPolicy {
- public:
-  /**
-   * @brief Constructor.
-   *
-   * @param worker_directory The worker directory.
-   * @param start_worker_thread_index The ID of the worker which should be chosen
-   *        initially.
-   *
-   * @warning If the number of workers change between the constructor and
-   *          getNextWorkerID() invocations, the worker ID returned by
-   *          getNextWorkerID() initially may not be the same as desired.
-   **/
-  RoundRobinWorkerSelectionPolicy(const WorkerDirectory &worker_directory,
-                                  const std::size_t start_worker_thread_index)
-      : WorkerSelectionPolicy(worker_directory) {
-    if (start_worker_thread_index == 0) {
-      previous_worker_thread_index_ = worker_directory.getNumWorkers() - 1;
-    } else {
-      previous_worker_thread_index_ = start_worker_thread_index - 1;
-    }
-  }
-
-  std::size_t getNextWorkerID() override {
-    previous_worker_thread_index_ =
-        (previous_worker_thread_index_ + 1) % worker_directory_.getNumWorkers();
-    return previous_worker_thread_index_;
-  }
-
- private:
-  std::size_t previous_worker_thread_index_;
-
-  DISALLOW_COPY_AND_ASSIGN(RoundRobinWorkerSelectionPolicy);
-};
-
-/**
- * @brief Choose the next worker so that the load among workers is balanced.
- **/
-class LoadBalancingWorkerSelectionPolicy : public WorkerSelectionPolicy {
- public:
-  explicit LoadBalancingWorkerSelectionPolicy(
-      const WorkerDirectory &worker_directory)
-      : WorkerSelectionPolicy(worker_directory) {}
-
-  std::size_t getNextWorkerID() override {
-    return worker_directory_.getLeastLoadedWorker().first;
-  }
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(LoadBalancingWorkerSelectionPolicy);
-};
-
-/**
- * @brief Choose a (uniform) random worker.
- **/
-class RandomWorkerSelectionPolicy : public WorkerSelectionPolicy {
- public:
-  explicit RandomWorkerSelectionPolicy(const WorkerDirectory &worker_directory)
-      : WorkerSelectionPolicy(worker_directory),
-        mt_(std::random_device()()) {}
-
-  std::size_t getNextWorkerID() override {
-    std::uniform_int_distribution<std::size_t> dist(
-        0, worker_directory_.getNumWorkers() - 1);
-    return dist(mt_);
-  }
-
- private:
-  std::mt19937_64 mt_;
-
-  DISALLOW_COPY_AND_ASSIGN(RandomWorkerSelectionPolicy);
-};
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_WORKER_SELECTION_POLICY_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
deleted file mode 100644
index 32437c3..0000000
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include <cstdlib>
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogAttribute.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "query_execution/BlockLocator.hpp"
-#include "query_execution/BlockLocatorUtil.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
-#include "storage/StorageManager.hpp"
-#include "types/TypeFactory.hpp"
-#include "types/TypeID.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-#include "gtest/gtest.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::free;
-using std::malloc;
-using std::move;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::AnnotatedMessage;
-using tmb::MessageBus;
-using tmb::TaggedMessage;
-
-namespace quickstep {
-
-class BlockLocatorTest : public ::testing::Test {
- protected:
-  static const char kStoragePath[];
-  static const char kDomainNetworkAddress[];
-
-  ~BlockLocatorTest() {
-    locator_->join();
-  }
-
-  virtual void SetUp() {
-    bus_.Initialize();
-
-    locator_.reset(new BlockLocator(&bus_));
-    locator_->start();
-
-    worker_client_id_ = bus_.Connect();
-
-    bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
-    bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
-
-    bus_.RegisterClientAsSender(worker_client_id_, kLocateBlockMessage);
-    bus_.RegisterClientAsReceiver(worker_client_id_, kLocateBlockResponseMessage);
-
-    bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
-
-    block_domain_ =
-        block_locator::getBlockDomain(kDomainNetworkAddress, worker_client_id_, &locator_client_id_, &bus_);
-    DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
-
-    storage_manager_.reset(
-        new StorageManager(kStoragePath, block_domain_, locator_client_id_, &bus_));
-  }
-
-  virtual void TearDown() {
-    storage_manager_.reset();
-
-    TaggedMessage message(kPoisonMessage);
-
-    LOG(INFO) << "Worker (id '" << worker_client_id_
-              << "') sent PoisonMessage (typed '" << kPoisonMessage
-              << "') to BlockLocator (id '" << locator_client_id_ << "')";
-    CHECK(MessageBus::SendStatus::kOK ==
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           worker_client_id_,
-                                           locator_client_id_,
-                                           move(message)));
-  }
-
-  vector<block_id_domain> getPeerDomains(const block_id block) {
-    serialization::BlockMessage proto;
-    proto.set_block_id(block);
-
-    const int proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kLocateBlockMessage);
-    free(proto_bytes);
-
-  LOG(INFO) << "Worker (id '" << worker_client_id_
-            << "') sent LocateBlockMessage (typed '" << kLocateBlockMessage
-            << "') to BlockLocator";
-    CHECK(MessageBus::SendStatus::kOK ==
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           worker_client_id_,
-                                           locator_client_id_,
-                                           move(message)));
-
-    const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
-    const TaggedMessage &tagged_message = annotated_message.tagged_message;
-    CHECK_EQ(kLocateBlockResponseMessage, tagged_message.message_type());
-    LOG(INFO) << "Worker (id '" << worker_client_id_
-              << "') received LocateBlockResponseMessage from BlockLocator";
-
-    serialization::LocateBlockResponseMessage response_proto;
-    CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-    vector<block_id_domain> domains;
-    for (int i = 0; i < response_proto.block_domains_size(); ++i) {
-      domains.push_back(response_proto.block_domains(i));
-    }
-
-    return domains;
-  }
-
-  void checkLoaded(const block_id block) {
-    const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
-    EXPECT_EQ(1u, peer_domain_network_addresses.size());
-    EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data());
-
-    const vector<block_id_domain> domains = getPeerDomains(block);
-    EXPECT_EQ(1u, domains.size());
-    EXPECT_EQ(block_domain_, domains[0]);
-  }
-
-  void checkEvicted(const block_id block) {
-    const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
-    EXPECT_TRUE(peer_domain_network_addresses.empty());
-
-    const vector<block_id_domain> domains = getPeerDomains(block);
-    EXPECT_TRUE(domains.empty());
-  }
-
-  tmb::client_id worker_client_id_;
-
-  block_id_domain block_domain_;
-  unique_ptr<StorageManager> storage_manager_;
-
- private:
-  MessageBusImpl bus_;
-
-  unique_ptr<BlockLocator> locator_;
-  tmb::client_id locator_client_id_;
-};
-
-const char BlockLocatorTest::kStoragePath[] = "./block_locator_test_data/";
-const char BlockLocatorTest::kDomainNetworkAddress[] = "ip:port";
-
-TEST_F(BlockLocatorTest, BlockTest) {
-  CatalogRelation relation(nullptr, "rel");
-  relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt)));
-
-  const block_id block =
-      storage_manager_->createBlock(relation, relation.getDefaultStorageBlockLayout());
-  checkLoaded(block);
-
-  ASSERT_TRUE(storage_manager_->saveBlockOrBlob(block));
-  storage_manager_->evictBlockOrBlob(block);
-  checkEvicted(block);
-
-  {
-    const BlockReference block_ref = storage_manager_->getBlock(block, relation);
-  }
-  checkLoaded(block);
-
-  storage_manager_->deleteBlockOrBlobFile(block);
-  checkEvicted(block);
-}
-
-TEST_F(BlockLocatorTest, BlobTest) {
-  const block_id blob = storage_manager_->createBlob(kDefaultBlockSizeInSlots);
-  checkLoaded(blob);
-
-  ASSERT_TRUE(storage_manager_->saveBlockOrBlob(blob));
-  storage_manager_->evictBlockOrBlob(blob);
-  checkEvicted(blob);
-
-  {
-    const BlobReference blob_ref = storage_manager_->getBlob(blob);
-  }
-  checkLoaded(blob);
-
-  storage_manager_->deleteBlockOrBlobFile(blob);
-  checkEvicted(blob);
-}
-
-}  // namespace quickstep
-
-int main(int argc, char **argv) {
-  google::InitGoogleLogging(argv[0]);
-  // Honor FLAGS_buffer_pool_slots in StorageManager.
-  gflags::ParseCommandLineFlags(&argc, &argv, true);
-
-  ::testing::InitGoogleTest(&argc, argv);
-
-  return RUN_ALL_TESTS();
-}