You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sp...@apache.org on 2016/12/11 17:45:40 UTC
[17/51] [abbrv] [partial] incubator-quickstep git commit: remove c++
files
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
deleted file mode 100644
index 6538d48..0000000
--- a/query_execution/Shiftboss.hpp
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
-
-#include <cstddef>
-#include <cstdint>
-#include <memory>
-#include <unordered_map>
-
-#include "catalog/CatalogDatabaseCache.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "threading/Thread.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace quickstep {
-
-class StorageManager;
-
-namespace serialization {
-class CatalogDatabase;
-class QueryContext;
-} // namespace serialization
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief The Shiftboss accepts workorder protos from shiftboss, and assigns
- * the workorders to workers.
- **/
-class Shiftboss : public Thread {
- public:
- /**
- * @brief Constructor.
- *
- * @param bus A pointer to the TMB.
- * @param storage_manager The StorageManager to use.
- * @param workers A pointer to the WorkerDirectory.
- * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
- *
- * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
- * around on different CPUs by the OS.
- **/
- Shiftboss(tmb::MessageBus *bus,
- StorageManager *storage_manager,
- WorkerDirectory *workers,
- const int cpu_id = -1)
- : bus_(DCHECK_NOTNULL(bus)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)),
- workers_(DCHECK_NOTNULL(workers)),
- cpu_id_(cpu_id),
- shiftboss_client_id_(tmb::kClientIdNone),
- foreman_client_id_(tmb::kClientIdNone),
- max_msgs_per_worker_(1),
- start_worker_index_(0u) {
- // Check to have at least one Worker.
- DCHECK_GT(workers->getNumWorkers(), 0u);
-
- shiftboss_client_id_ = bus_->Connect();
- LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
- DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
-
- // Messages between Foreman and Shiftboss.
- bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
-
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage);
-
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
-
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
-
- // Message sent to Worker.
- bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
-
- // Forward the following message types from Foreman to Workers.
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
-
- // Forward the following message types from Workers to Foreman.
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
-
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
-
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
-
- // Clean up query execution states, i.e., QueryContext.
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
-
- // Stop itself.
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
- // Stop all workers.
- bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage);
-
- for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
- worker_addresses_.AddRecipient(workers_->getClientID(i));
- }
-
- registerWithForeman();
- }
-
- ~Shiftboss() override {
- }
-
- /**
- * @brief Get the TMB client ID of Shiftboss thread.
- *
- * @return TMB client ID of shiftboss thread.
- **/
- inline tmb::client_id getBusClientID() const {
- return shiftboss_client_id_;
- }
-
- /**
- * @brief Get the Work Order processing capacity of all Workers managed by
- * Shiftboss during a single round of WorkOrder dispatch.
- **/
- inline std::size_t getWorkOrderCapacity() const {
- DCHECK_NE(max_msgs_per_worker_, 0u);
- return max_msgs_per_worker_ * workers_->getNumWorkers();
- }
-
- /**
- * @brief Get the Worker to assign WorkOrders for execution. Block to wait if
- * all Workers have reached their capacity for queued WorkOrders.
- **/
- // TODO(zuyu): To achieve non-blocking, we need a queue to cache received
- // normal Work Order protos from Foreman and the generated rebuild Work Orders.
- inline std::size_t getSchedulableWorker();
-
- /**
- * @brief Set the maximum number of messages that should be allocated to each
- * worker during a single round of WorkOrder dispatch.
- *
- * @param max_msgs_per_worker Maximum number of messages.
- **/
- inline void setMaxMessagesPerWorker(const std::size_t max_msgs_per_worker) {
- max_msgs_per_worker_ = max_msgs_per_worker;
- }
-
- protected:
- /**
- * @brief The shiftboss receives workorders, and based on the response it
- * assigns workorders to workers.
- *
- * @note The workers who get the messages from the Shiftboss execute and
- * subsequently delete the WorkOrder contained in the message.
- **/
- void run() override;
-
- private:
- void registerWithForeman();
-
- void processShiftbossRegistrationResponseMessage();
-
- /**
- * @brief Process the Shiftboss initiate message and ack back.
- *
- * @param query_id The given query id.
- * @param catalog_database_cache_proto The proto used to update
- * CatalogDatabaseCache.
- * @param query_context_proto The QueryContext proto.
- **/
- void processQueryInitiateMessage(const std::size_t query_id,
- const serialization::CatalogDatabase &catalog_database_cache_proto,
- const serialization::QueryContext &query_context_proto);
-
- /**
- * @brief Process the RebuildWorkOrder initiate message and ack back.
- *
- * @param query_id The ID of the query to which this RebuildWorkOrder initiate
- * message belongs.
- * @param op_index The index of the operator for rebuild work orders.
- * @param dest_index The InsertDestination index in QueryContext to rebuild.
- * @param rel_id The relation that needs to generate rebuild work orders.
- **/
- void processInitiateRebuildMessage(const std::size_t query_id,
- const std::size_t op_index,
- const QueryContext::insert_destination_id dest_index,
- const relation_id rel_id);
-
- // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss,
- // and Shiftboss and Worker thread pool.
- tmb::MessageBus *bus_;
-
- CatalogDatabaseCache database_cache_;
- StorageManager *storage_manager_;
- WorkerDirectory *workers_;
-
- // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
- const int cpu_id_;
-
- tmb::client_id shiftboss_client_id_, foreman_client_id_;
-
- // Unique per Shiftboss instance.
- std::uint64_t shiftboss_index_;
-
- // TMB recipients for all workers managed by this Shiftboss.
- tmb::Address worker_addresses_;
-
- // During a single round of WorkOrder dispatch, a Worker should be allocated
- // at most these many WorkOrders.
- std::size_t max_msgs_per_worker_;
-
- // The worker index for scheduling Work Order.
- std::size_t start_worker_index_;
-
- // QueryContexts per query.
- std::unordered_map<std::size_t, std::unique_ptr<QueryContext>> query_contexts_;
-
- DISALLOW_COPY_AND_ASSIGN(Shiftboss);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ShiftbossDirectory.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ShiftbossDirectory.hpp b/query_execution/ShiftbossDirectory.hpp
deleted file mode 100644
index 88d3039..0000000
--- a/query_execution/ShiftbossDirectory.hpp
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
-
-#include <cstddef>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A class which keeps the metadata about the shiftbosses.
- *
- * @note This class is intended to be used only by ForemanDistributed thread.
- * Therefore, none of the methods in this class are thread-safe.
- **/
-class ShiftbossDirectory {
- public:
- /**
- * @brief Constructor.
- **/
- ShiftbossDirectory() = default;
-
- /**
- * @brief Add the Shiftboss.
- *
- * @param shiftboss_id the TMB client ID of Shiftboss thread.
- * @param capacity The Work Order processing capacity of Shiftboss.
- **/
- void addShiftboss(const tmb::client_id shiftboss_id,
- const std::size_t capacity) {
- client_id_indices_.emplace(shiftboss_id, client_ids_.size());
- client_ids_.push_back(shiftboss_id);
- work_order_capacities_.push_back(capacity);
- num_queued_work_orders_.push_back(0u);
- }
-
- /**
- * @brief Whether the ShiftbossDirectory has any Shiftboss.
- *
- * @return True if no Shiftboss in ShiftbossDirectory. Otherwise false.
- **/
- bool empty() const {
- DCHECK_EQ(client_ids_.empty(), client_id_indices_.empty());
- DCHECK_EQ(client_ids_.empty(), work_order_capacities_.empty());
- DCHECK_EQ(client_ids_.empty(), num_queued_work_orders_.empty());
-
- return client_ids_.empty();
- }
-
- /**
- * @brief Get the number of Shiftboss in ShiftbossDirectory.
- *
- * @return The number of Shiftboss in ShiftbossDirectory.
- **/
- std::size_t size() const {
- DCHECK_EQ(client_ids_.size(), client_id_indices_.size());
- DCHECK_EQ(client_ids_.size(), work_order_capacities_.size());
- DCHECK_EQ(client_ids_.size(), num_queued_work_orders_.size());
-
- return client_ids_.size();
- }
-
- /**
- * @brief Get the TMB client ID of the specified Shiftboss.
- *
- * @param shiftboss_index The index of Shiftboss.
- *
- * @return The TMB client ID of the given Shiftboss.
- **/
- tmb::client_id getClientId(const std::size_t shiftboss_index) const {
- DCHECK_LT(shiftboss_index, size());
- return client_ids_[shiftboss_index];
- }
-
- /**
- * @brief Get the Shiftboss index from the specified client id.
- *
- * @param shiftboss_id The TMB client ID of Shiftboss.
- *
- * @return The index of the given Shiftboss.
- **/
- std::size_t getShiftbossIndex(const tmb::client_id shiftboss_id) const {
- const auto it = client_id_indices_.find(shiftboss_id);
- DCHECK(it != client_id_indices_.end());
-
- return it->second;
- }
-
- /**
- * @brief Whether the given Shiftboss has reached its capacity.
- *
- * @param shiftboss_index The index of Shiftboss.
- *
- * @return True if reached the capacity. Otherwise false.
- **/
- bool hasReachedCapacity(const std::size_t shiftboss_index) const {
- DCHECK_LT(shiftboss_index, size());
- return num_queued_work_orders_[shiftboss_index] >= work_order_capacities_[shiftboss_index];
- }
-
- /**
- * @brief Add the number of new work orders for the given Shiftboss.
- *
- * @param shiftboss_index The index of Shiftboss.
- * @param num_new_work_orders The number of the new work orders will be
- * executed on Shiftboss indexed by 'shiftboss_index'.
- **/
- void addNumQueuedWorkOrders(const std::size_t shiftboss_index,
- const std::size_t num_new_work_orders) {
- num_queued_work_orders_[shiftboss_index] += num_new_work_orders;
- }
-
- /**
- * @brief Increase the number of queued workorders for the given Shiftboss by 1.
- *
- * @param shiftboss_index The index of Shiftboss.
- **/
- void incrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
- DCHECK_LT(shiftboss_index, size());
- ++num_queued_work_orders_[shiftboss_index];
- }
-
- /**
- * @brief Decrease the number of queued workorders for the given Shiftboss by 1.
- *
- * @param shiftboss_index The index of Shiftboss.
- **/
- void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) {
- DCHECK_LT(shiftboss_index, size());
- DCHECK_GE(num_queued_work_orders_[shiftboss_index], 1u);
- --num_queued_work_orders_[shiftboss_index];
- }
-
- private:
- // The TMB client IDs of Shiftbosses.
- // TODO(zuyu): Support deletions, as Shiftbosses go down.
- std::vector<tmb::client_id> client_ids_;
-
- // The map from the TMB client ID of Shiftboss to its index in 'client_ids_'.
- std::unordered_map<tmb::client_id, std::size_t> client_id_indices_;
-
- // The max number of WorkOrders per Shiftboss.
- std::vector<std::size_t> work_order_capacities_;
-
- // The number of WorkOrders queued for execution per Shiftboss, and the value
- // should be not greater than that of work_order_capacities_.
- std::vector<std::size_t> num_queued_work_orders_;
-
- DISALLOW_COPY_AND_ASSIGN(ShiftbossDirectory);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_DIRECTORY_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkOrderProtosContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrderProtosContainer.hpp b/query_execution/WorkOrderProtosContainer.hpp
deleted file mode 100644
index b037332..0000000
--- a/query_execution/WorkOrderProtosContainer.hpp
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKORDER_PROTOS_CONTAINER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKORDER_PROTOS_CONTAINER_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <queue>
-#include <vector>
-
-#include "relational_operators/WorkOrder.pb.h" // IWYU pragma: keep
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- * @{
- */
-
- /**
- * @brief A container used in the distributed version to hold the normal
- * (non-rebuild) WorkOrder protos for a given query.
- *
- * @note This container stays alive during the lifetime of the query.
- **/
-class WorkOrderProtosContainer {
- public:
- /**
- * @brief Constructor
- *
- * @param num_operators Number of operators in the query DAG.
- **/
- explicit WorkOrderProtosContainer(const std::size_t num_operators)
- : num_operators_(num_operators),
- operator_containers_(num_operators_) {
- DCHECK_NE(num_operators_, 0u);
- }
-
- /**
- * @brief Destructor.
- *
- * @note If the query is executed normally, we should never encounter a
- * situation where at the time of deletion the WorkOrderProtosContainer has
- * pending WorkOrders.
- **/
- ~WorkOrderProtosContainer() {
- for (std::size_t op = 0; op < num_operators_; ++op) {
- if (hasWorkOrderProto(op)) {
- LOG(WARNING) << "Destroying a WorkOrderProtosContainer that still has pending WorkOrder protos.";
- break;
- }
- }
- }
-
- /**
- * @brief Check if there are some pending WorkOrders for the given operator.
- *
- * @param operator_index Index of the operator.
- *
- * @return If there are pending WorkOrders.
- **/
- bool hasWorkOrderProto(const std::size_t operator_index) const {
- DCHECK_LT(operator_index, num_operators_);
- return !operator_containers_[operator_index].empty();
- }
-
- /**
- * @brief Get a WorkOrder for a given operator.
- *
- * @param operator_index The index of the operator.
- *
- * @return Release a WorkOrder proto. If no WorkOrder proto is available,
- * return nullptr.
- **/
- serialization::WorkOrder* getWorkOrderProto(const std::size_t operator_index) {
- DCHECK_LT(operator_index, num_operators_);
-
- if (operator_containers_[operator_index].empty()) {
- return nullptr;
- }
-
- serialization::WorkOrder *proto =
- operator_containers_[operator_index].front().release();
- operator_containers_[operator_index].pop();
-
- return proto;
- }
-
- /**
- * @brief Add a WorkOrder generated from a given
- * operator.
- *
- * @param workorder A pointer to the WorkOrder to be added.
- * @param operator_index The index of the operator in the query DAG.
- **/
- void addWorkOrderProto(serialization::WorkOrder *proto,
- const std::size_t operator_index) {
- DCHECK(proto != nullptr);
- DCHECK_LT(operator_index, num_operators_);
-
- operator_containers_[operator_index].emplace(
- std::unique_ptr<serialization::WorkOrder>(proto));
- }
-
- /**
- * @brief Get the number of all pending WorkOrders
- * for a given operator.
- *
- * @param operator_index The index of the operator.
- *
- * @return The number of pending WorkOrders.
- **/
- std::size_t getNumWorkOrderProtos(const std::size_t operator_index) const {
- DCHECK_LT(operator_index, num_operators_);
- return operator_containers_[operator_index].size();
- }
-
- private:
- const std::size_t num_operators_;
-
- std::vector<std::queue<std::unique_ptr<serialization::WorkOrder>>> operator_containers_;
-
- DISALLOW_COPY_AND_ASSIGN(WorkOrderProtosContainer);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_WORKORDER_PROTOS_CONTAINER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkOrdersContainer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.cpp b/query_execution/WorkOrdersContainer.cpp
deleted file mode 100644
index a16a8e6..0000000
--- a/query_execution/WorkOrdersContainer.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/WorkOrdersContainer.hpp"
-
-#include <algorithm>
-#include <cstddef>
-#include <list>
-#include <memory>
-#include <vector>
-
-#include "relational_operators/WorkOrder.hpp"
-
-#include "glog/logging.h"
-
-using std::unique_ptr;
-
-namespace quickstep {
-
-WorkOrdersContainer::~WorkOrdersContainer() {
- // For each operator ..
- for (std::size_t op = 0; op < num_operators_; ++op) {
- if (hasNormalWorkOrder(op) || hasRebuildWorkOrder(op)) {
- LOG(WARNING) << "Destroying a WorkOrdersContainer that still has pending WorkOrders.";
- break;
- }
- }
-}
-
-WorkOrder* WorkOrdersContainer::InternalListContainer::getWorkOrderForNUMANode(
- const int numa_node) {
- for (std::list<unique_ptr<WorkOrder>>::iterator it = workorders_.begin();
- it != workorders_.end();
- ++it) {
- const std::vector<int> &numa_nodes = (*it)->getPreferredNUMANodes();
- if (!numa_nodes.empty()) {
- if (std::find(numa_nodes.begin(), numa_nodes.end(), numa_node) !=
- numa_nodes.end()) {
- WorkOrder *work_order = it->release();
- workorders_.erase(it);
- return work_order;
- }
- }
- }
- return nullptr;
-}
-
-void WorkOrdersContainer::OperatorWorkOrdersContainer::addWorkOrder(
- WorkOrder *workorder) {
- const std::vector<int> &numa_nodes = workorder->getPreferredNUMANodes();
- if (!numa_nodes.empty()) {
- if (numa_nodes.size() == 1) {
- // This WorkOrder prefers exactly one NUMA node.
- single_numa_node_workorders_[numa_nodes.front()].addWorkOrder(
- workorder);
- } else {
- // This WorkOrder prefers more than one NUMA node.
- multiple_numa_nodes_workorders_.addWorkOrder(workorder);
- }
- } else {
- numa_agnostic_workorders_.addWorkOrder(workorder);
- }
-}
-
-std::size_t
- WorkOrdersContainer::InternalListContainer::getNumWorkOrdersForNUMANode(
- const int numa_node) const {
- std::size_t num_workorders = 0;
- for (const unique_ptr<WorkOrder> &work_order : workorders_) {
- const std::vector<int> &numa_nodes = work_order->getPreferredNUMANodes();
- if (!numa_nodes.empty()) {
- std::vector<int>::const_iterator
- it = std::find(numa_nodes.begin(), numa_nodes.end(), numa_node);
- if (it != numa_nodes.end()) {
- // Found a match.
- ++num_workorders;
- }
- }
- }
- return num_workorders;
-}
-
-bool WorkOrdersContainer::InternalListContainer::hasWorkOrderForNUMANode(
- const int numa_node) const {
- for (const unique_ptr<WorkOrder> &work_order : workorders_) {
- const std::vector<int> &numa_nodes = work_order->getPreferredNUMANodes();
- if (!numa_nodes.empty()) {
- std::vector<int>::const_iterator
- it = std::find(numa_nodes.begin(), numa_nodes.end(), numa_node);
- if (it != numa_nodes.end()) {
- // Found a match.
- return true;
- }
- }
- }
- return false;
-}
-
-WorkOrder* WorkOrdersContainer::OperatorWorkOrdersContainer::getWorkOrder(
- const bool prefer_single_NUMA_node) {
- // This function tries to get any available WorkOrder.
- WorkOrder *workorder = numa_agnostic_workorders_.getWorkOrder();
- if (workorder == nullptr) {
- if (prefer_single_NUMA_node) {
- workorder = getSingleNUMANodeWorkOrderHelper();
- if (workorder == nullptr) {
- workorder = multiple_numa_nodes_workorders_.getWorkOrder();
- }
- } else {
- workorder = multiple_numa_nodes_workorders_.getWorkOrder();
- if (workorder == nullptr) {
- workorder = getSingleNUMANodeWorkOrderHelper();
- }
- }
- }
- return workorder;
-}
-
-WorkOrder* WorkOrdersContainer::OperatorWorkOrdersContainer::
- getSingleNUMANodeWorkOrderHelper() {
- WorkOrder *workorder = nullptr;
- for (PtrVector<InternalQueueContainer>::iterator it =
- single_numa_node_workorders_.begin();
- it != single_numa_node_workorders_.end(); ++it) {
- workorder = it->getWorkOrder();
- if (workorder != nullptr) {
- return workorder;
- }
- }
- return nullptr;
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
deleted file mode 100644
index c1739bc..0000000
--- a/query_execution/WorkOrdersContainer.hpp
+++ /dev/null
@@ -1,515 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_
-
-#include <cstddef>
-#include <list>
-#include <memory>
-#include <queue>
-#include <vector>
-
-#include "relational_operators/WorkOrder.hpp"
-#include "utility/Macros.hpp"
-#include "utility/PtrVector.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- * @{
- */
-
- /**
- * @brief A container to hold the WorkOrders for a given query.
- * @note This container stays alive during the lifetime of the query.
- * @note The NUMA node indexing is assumed to start from 0.
- **/
-class WorkOrdersContainer {
- public:
- /**
- * @brief Constructor
- *
- * @param num_operators Number of operators in the query DAG.
- * @param num_numa_nodes Number of NUMA nodes in the system.
- **/
- WorkOrdersContainer(const std::size_t num_operators,
- const std::size_t num_numa_nodes)
- : num_operators_(num_operators), num_numa_nodes_(num_numa_nodes) {
- DEBUG_ASSERT(num_operators != 0);
- for (std::size_t op = 0; op < num_operators; ++op) {
- normal_workorders_.push_back(
- new OperatorWorkOrdersContainer(num_numa_nodes_));
- rebuild_workorders_.push_back(
- new OperatorWorkOrdersContainer(num_numa_nodes_));
- }
- }
-
- /**
- * @brief Destructor.
- *
- * @note If the query is executed normally, we should never encounter a
- * situation where at the time of deletion the WorkOrdersContainer has
- * pending WorkOrders.
- **/
- ~WorkOrdersContainer();
-
- /**
- * @brief Check if there are some pending WorkOrders for the given operator.
- *
- * @param operator_index Index of the operator.
- *
- * @return If there are pending WorkOrders.
- **/
- inline bool hasNormalWorkOrder(const std::size_t operator_index) const {
- DCHECK_LT(operator_index, num_operators_);
- return normal_workorders_[operator_index].hasWorkOrder();
- }
-
- /**
- * @brief Check if there are some pending normal (i.e. non-rebuild) WorkOrders
- * for the given operator which prefer the specified NUMA node.
- *
- * @param operator_index Index of the operator.
- * @param numa_node_id The NUMA node.
- *
- * @return If there are pending WorkOrders for the given operator which prefer
- * numa_node_id.
- **/
- inline bool hasNormalWorkOrderForNUMANode(
- const std::size_t operator_index, const int numa_node_id) const {
- DCHECK_LT(operator_index, num_operators_);
- DCHECK_GE(numa_node_id, 0);
- DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
- return normal_workorders_[operator_index].hasWorkOrderForNUMANode(
- numa_node_id);
- }
-
- /**
- * @brief Check if there are some pending rebuild WorkOrders for the given
- * operator.
- *
- * @param operator_index Index of the operator.
- *
- * @return If there are pending rebuild WorkOrders.
- **/
- inline bool hasRebuildWorkOrder(const std::size_t operator_index) const {
- DCHECK_LT(operator_index, num_operators_);
- return rebuild_workorders_[operator_index].hasWorkOrder();
- }
-
- /**
- * @brief Check if there are some pending rebuild WorkOrders for the given
- * operator which prefer the specified NUMA node.
- *
- * @param operator_index Index of the operator.
- * @param numa_node_id The NUMA node.
- *
- * @return If there are pending rebuild WorkOrders for the given operator which
- * prefer numa_node_id.
- **/
- inline bool hasRebuildWorkOrderForNUMANode(
- const std::size_t operator_index, const int numa_node_id) const {
- DCHECK_LT(operator_index, num_operators_);
- DCHECK_GE(numa_node_id, 0);
- DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
- return rebuild_workorders_[operator_index].hasWorkOrderForNUMANode(
- numa_node_id);
- }
-
- /**
- * @brief Get a normal (non-rebuild) WorkOrder for a given operator which
- * prefer the given NUMA node.
- *
- * @param operator_index The index of the operator.
- * @param numa_node_id The NUMA node.
- *
- * @return A WorkOrder which prefers numa_node_id. If no such WorkOrder is
- * available, return nullptr. The caller is responsible for taking the
- * ownership.
- **/
- WorkOrder* getNormalWorkOrderForNUMANode(const std::size_t operator_index,
- const int numa_node_id) {
- DCHECK_LT(operator_index, num_operators_);
- DCHECK_GE(numa_node_id, 0);
- DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
- return normal_workorders_[operator_index].getWorkOrderForNUMANode(
- numa_node_id);
- }
-
- /**
- * @brief Get a normal (non-rebuild) WorkOrder for a given operator.
- *
- * @param operator_index The index of the operator.
- * @param prefer_single_NUMA_node If true, first try to get workorders which
- * prefer exactly one NUMA node over workorders which list more than
- * one NUMA node as their preference.
- *
- * @return A WorkOrder. If no WorkOrder is available, returns nullptr. The
- * caller is responsible for taking the ownership.
- **/
- WorkOrder* getNormalWorkOrder(const std::size_t operator_index,
- const bool prefer_single_NUMA_node = true) {
- DCHECK_LT(operator_index, num_operators_);
- return normal_workorders_[operator_index].getWorkOrder(
- prefer_single_NUMA_node);
- }
-
- /**
- * @brief Get a rebuild WorkOrder for a given operator whch prefer the
- * specified NUMA node.
- *
- * @param operator_index The index of the operator.
- * @param numa_node_id The NUMA node.
- *
- * @return A WorkOrder that prefers numa_node_id. If no such WorkOrder is
- * available, return nullptr. The caller is responsible for taking the
- * ownership.
- **/
- WorkOrder* getRebuildWorkOrderForNUMANode(const std::size_t operator_index,
- const int numa_node_id) {
- DCHECK_LT(operator_index, num_operators_);
- DCHECK_GE(numa_node_id, 0);
- DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
- return rebuild_workorders_[operator_index].getWorkOrderForNUMANode(
- numa_node_id);
- }
-
- /**
- * @brief Get a rebuild WorkOrder for the given operator
- *
- * @param operator_index The index of the operator.
- * @param prefer_single_NUMA_node If true, first try to get workorders which
- * prefer exactly one NUMA node over workorders which list more than
- * one NUMA node as their preference.
- *
- * @return A WorkOrder. If no WorkOrder is available, returns nullptr. The
- * caller is responsible for taking the ownership.
- **/
- WorkOrder* getRebuildWorkOrder(const std::size_t operator_index,
- const bool prefer_single_NUMA_node = true) {
- DCHECK_LT(operator_index, num_operators_);
- return rebuild_workorders_[operator_index].getWorkOrder(
- prefer_single_NUMA_node);
- }
-
- /**
- * @brief Add a normal (non-rebuild) WorkOrder generated from a given
- * operator.
- *
- * @note Take the ownership of \p workorder.
- * @note The workorder to be added contains information about its preferred
- * NUMA nodes. This information is used to insert the WorkOrder
- * appropriately.
- *
- * @param workorder A pointer to the WorkOrder to be added.
- * @param operator_index The index of the operator in the query DAG.
- **/
- void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
- DCHECK(workorder != nullptr);
- DCHECK_LT(operator_index, num_operators_);
- normal_workorders_[operator_index].addWorkOrder(workorder);
- }
-
- /**
- * @brief Add a rebuild WorkOrder generated from a given operator.
- *
- * @note Take the ownership of \p workorder.
- * @note The workorder to be added contains information about its preferred
- * NUMA nodes. This information is used to insert the WorkOrder
- * appropriately.
- *
- * @param workorder A pointer to the WorkOrder to be added.
- * @param operator_index The index of the operator in the query DAG.
- **/
- void addRebuildWorkOrder(WorkOrder *workorder,
- const std::size_t operator_index) {
- DCHECK(workorder != nullptr);
- DCHECK_LT(operator_index, num_operators_);
- rebuild_workorders_[operator_index].addWorkOrder(workorder);
- }
-
- /**
- * @brief Get the number of pending normal WorkOrders for a given operator
- * which prefer the specified NUMA node.
- *
- * @param operator_index The index of the operator.
- * @param numa_node_id The NUMA node.
- *
- * @return The number of pending WorkOrders which prefer numa_node_id.
- **/
- inline std::size_t getNumNormalWorkOrdersForNUMANode(
- const std::size_t operator_index, const int numa_node_id) const {
- DCHECK_LT(operator_index, num_operators_);
- DCHECK_GE(numa_node_id, 0);
- DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
- return normal_workorders_[operator_index].getNumWorkOrdersForNUMANode(
- numa_node_id);
- }
-
- /**
- * @brief Get the number of all pending normal (i.e. non-rebuild) WorkOrders
- * for a given operator.
- *
- * @param operator_index The index of the operator.
- *
- * @return The number of pending normal WorkOrders.
- **/
- inline std::size_t getNumNormalWorkOrders(
- const std::size_t operator_index) const {
- DCHECK_LT(operator_index, num_operators_);
- return normal_workorders_[operator_index].getNumWorkOrders();
- }
-
- /**
- * @brief Get the number of pending rebuild WorkOrders for a given operator
- * which prefer the specified NUMA node.
- *
- * @param operator_index The index of the operator.
- * @param numa_node_id The NUMA node.
- *
- * @return The number of pending WorkOrders which prefer numa_node_id.
- **/
- inline std::size_t getNumRebuildWorkOrdersForNUMANode(
- const std::size_t operator_index, const int numa_node_id) const {
- DCHECK_LT(operator_index, num_operators_);
- DCHECK_GE(numa_node_id, 0);
- DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
- return rebuild_workorders_[operator_index].getNumWorkOrdersForNUMANode(
- numa_node_id);
- }
-
- /**
- * @brief Get the number of all pending rebuild WorkOrders for a given
- * operator.
- *
- * @param operator_index The index of the operator.
- *
- * @return The number of pending rebuild WorkOrders.
- **/
- inline std::size_t getNumRebuildWorkOrders(
- const std::size_t operator_index) const {
- DCHECK_LT(operator_index, num_operators_);
- return rebuild_workorders_[operator_index].getNumWorkOrders();
- }
-
- private:
- /**
- * @brief An internal queue-based container structure to hold the WorkOrders.
- **/
- class InternalQueueContainer {
- public:
- InternalQueueContainer() {
- }
-
- inline void addWorkOrder(WorkOrder *workorder) {
- workorders_.emplace(std::unique_ptr<WorkOrder>(workorder));
- }
-
- inline WorkOrder* getWorkOrder() {
- if (workorders_.empty()) {
- return nullptr;
- }
-
- WorkOrder *work_order = workorders_.front().release();
- workorders_.pop();
- return work_order;
- }
-
- inline bool hasWorkOrder() const {
- return !workorders_.empty();
- }
-
- inline std::size_t getNumWorkOrders() const {
- return workorders_.size();
- }
-
- private:
- std::queue<std::unique_ptr<WorkOrder>> workorders_;
-
- DISALLOW_COPY_AND_ASSIGN(InternalQueueContainer);
- };
-
- /**
- * @brief An internal list-based container structure to hold the WorkOrders.
- *
- * @note We use this class for WorkOrders whose inputs come from multiple NUMA
- * nodes e.g. a HashJoinWorkOrder whose probe block and the hash table
- * may reside on two different NUMA nodes.
- **/
- class InternalListContainer {
- public:
- InternalListContainer() {
- }
-
- inline void addWorkOrder(WorkOrder *workorder) {
- workorders_.emplace_back(std::unique_ptr<WorkOrder>(workorder));
- }
-
- inline WorkOrder* getWorkOrder() {
- if (workorders_.empty()) {
- return nullptr;
- }
-
- WorkOrder *work_order = workorders_.front().release();
- workorders_.pop_front();
- return work_order;
- }
-
- /**
- * @note This method has O(N) complexity.
- **/
- WorkOrder* getWorkOrderForNUMANode(const int numa_node);
-
- inline bool hasWorkOrder() const {
- return !workorders_.empty();
- }
-
- /**
- * @brief Check if numa_node is in the set of preferred NUMA nodes for at
- * least one WorkOrder in this container.
- *
- * @note This method has O(N) complexity.
- **/
- bool hasWorkOrderForNUMANode(const int numa_node) const;
-
- inline std::size_t getNumWorkOrders() const {
- return workorders_.size();
- }
-
- /**
- * @brief Return the number of WorkOrders that list the numa_node as one of
- * the preferred nodes of execution.
- **/
- std::size_t getNumWorkOrdersForNUMANode(const int numa_node) const;
-
- private:
- std::list<std::unique_ptr<WorkOrder>> workorders_;
-
- DISALLOW_COPY_AND_ASSIGN(InternalListContainer);
- };
-
- /**
- * @brief A container to hold all the WorkOrders generated by one operator.
- **/
- class OperatorWorkOrdersContainer {
- public:
- explicit OperatorWorkOrdersContainer(const std::size_t num_numa_nodes)
- : num_numa_nodes_(num_numa_nodes) {
- for (std::size_t numa_node = 0; numa_node < num_numa_nodes; ++numa_node) {
- single_numa_node_workorders_.push_back(new InternalQueueContainer());
- }
- }
-
- void addWorkOrder(WorkOrder *workorder);
-
- bool hasWorkOrderForNUMANode(const int numa_node_id) const {
- DCHECK_GE(numa_node_id, 0);
- DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
- return single_numa_node_workorders_[numa_node_id].hasWorkOrder() ||
- multiple_numa_nodes_workorders_.hasWorkOrderForNUMANode(
- numa_node_id);
- }
-
- bool hasWorkOrder() const {
- if (!(numa_agnostic_workorders_.hasWorkOrder() ||
- multiple_numa_nodes_workorders_.hasWorkOrder())) {
- for (std::size_t i = 0; i < num_numa_nodes_; ++i) {
- if (hasWorkOrderForNUMANode(i)) {
- return true;
- }
- }
- return false;
- }
- return true;
- }
-
- std::size_t getNumWorkOrdersForNUMANode(
- const int numa_node_id) const {
- DCHECK_GE(numa_node_id, 0);
- DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
- return single_numa_node_workorders_[numa_node_id].getNumWorkOrders() +
- multiple_numa_nodes_workorders_.getNumWorkOrdersForNUMANode(
- numa_node_id);
- }
-
- inline std::size_t getNumWorkOrders() const {
- std::size_t num_workorders = numa_agnostic_workorders_.getNumWorkOrders();
- // for each NUMA node involved ..
- for (PtrVector<InternalQueueContainer>::const_iterator it =
- single_numa_node_workorders_.begin();
- it != single_numa_node_workorders_.end();
- ++it) {
- // Add the number of workorders for the NUMA node.
- num_workorders += it->getNumWorkOrders();
- }
- // Add the number of workorders who have multiple NUMA nodes as input.
- num_workorders += multiple_numa_nodes_workorders_.getNumWorkOrders();
- return num_workorders;
- }
-
- WorkOrder* getWorkOrderForNUMANode(const int numa_node_id) {
- DCHECK_GE(numa_node_id, 0);
- DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
- WorkOrder *work_order = single_numa_node_workorders_[numa_node_id].getWorkOrder();
- if (work_order == nullptr) {
- work_order = multiple_numa_nodes_workorders_.getWorkOrderForNUMANode(
- numa_node_id);
- }
- return work_order;
- }
-
- WorkOrder* getWorkOrder(const bool prefer_single_NUMA_node = true);
-
- private:
- WorkOrder* getSingleNUMANodeWorkOrderHelper();
-
- const std::size_t num_numa_nodes_;
-
- // A container to store NUMA agnostic workorders.
- InternalQueueContainer numa_agnostic_workorders_;
-
- // A vector of containers to store workorders which prefer exactly one NUMA
- // node for execution.
- PtrVector<InternalQueueContainer> single_numa_node_workorders_;
-
- // A container to store workorders which prefer more than one NUMA node for
- // execution.
- InternalListContainer multiple_numa_nodes_workorders_;
- // Note that no workorder should be shared among the *_workorders_ structures.
-
- DISALLOW_COPY_AND_ASSIGN(OperatorWorkOrdersContainer);
- };
-
- const std::size_t num_operators_;
- const std::size_t num_numa_nodes_;
-
- PtrVector<OperatorWorkOrdersContainer> normal_workorders_;
- PtrVector<OperatorWorkOrdersContainer> rebuild_workorders_;
-
- DISALLOW_COPY_AND_ASSIGN(WorkOrdersContainer);
-};
-/** @} */
-
-} // namespace quickstep
-
-
-#endif // QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
deleted file mode 100644
index 31eb964..0000000
--- a/query_execution/Worker.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/Worker.hpp"
-
-#include <chrono>
-#include <cstddef>
-#include <cstdint>
-#include <cstdlib>
-#include <utility>
-
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
-#include "relational_operators/WorkOrder.hpp"
-#include "threading/ThreadIDBasedMap.hpp"
-#include "threading/ThreadUtil.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::size_t;
-using std::uint64_t;
-
-using tmb::TaggedMessage;
-
-namespace quickstep {
-
-using serialization::WorkOrderCompletionMessage;
-
-void Worker::run() {
- if (cpu_id_ >= 0) {
- ThreadUtil::BindToCPU(cpu_id_);
- }
- ClientIDMap *thread_id_map = ClientIDMap::Instance();
- thread_id_map->addValue(worker_client_id_);
- for (;;) {
- // Receive() is a blocking call, causing this thread to sleep until next
- // message is received.
- const AnnotatedMessage annotated_msg =
- bus_->Receive(worker_client_id_, 0, true);
- const TaggedMessage &tagged_message = annotated_msg.tagged_message;
- switch (tagged_message.message_type()) {
-#ifdef QUICKSTEP_DISTRIBUTED
- case kShiftbossRegistrationResponseMessage: {
- serialization::ShiftbossRegistrationResponseMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- shiftboss_index_ = proto.shiftboss_index();
- break;
- }
-#endif // QUICKSTEP_DISTRIBUTED
- case kWorkOrderMessage: {
- WorkOrderCompletionMessage proto;
- executeWorkOrderHelper(tagged_message, &proto);
- sendWorkOrderCompleteMessage(
- annotated_msg.sender, proto, kWorkOrderCompleteMessage);
- break;
- }
- case kRebuildWorkOrderMessage: {
- WorkOrderCompletionMessage proto;
- executeWorkOrderHelper(tagged_message, &proto, true /* is_rebuild */);
- sendWorkOrderCompleteMessage(
- annotated_msg.sender, proto, kRebuildWorkOrderCompleteMessage);
- break;
- }
- case kPoisonMessage: {
- // Remove the entry from the thread ID based map for this worker thread.
- thread_id_map->removeValue();
- return;
- }
- }
- }
-}
-
-void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
- const WorkOrderCompletionMessage &proto,
- const message_type_id message_type) {
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char *>(std::malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage tagged_message(
- static_cast<const void *>(proto_bytes), proto_length, message_type);
- std::free(proto_bytes);
-
- DLOG(INFO) << "Worker sent WorkOrderCompleteMessage (typed '" << message_type
- << "') to Scheduler with TMB client ID " << receiver;
- const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(
- bus_, worker_client_id_, receiver, std::move(tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-}
-
-void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
- WorkOrderCompletionMessage *proto,
- const bool is_rebuild_work_order) {
- std::chrono::time_point<std::chrono::steady_clock> start, end;
- WorkerMessage worker_message(
- *static_cast<const WorkerMessage *>(tagged_message.message()));
- DCHECK(worker_message.getWorkOrder() != nullptr);
- const size_t query_id_for_workorder = worker_message.getWorkOrder()->getQueryID();
-
- // Start measuring the execution time.
- start = std::chrono::steady_clock::now();
- worker_message.getWorkOrder()->execute();
- end = std::chrono::steady_clock::now();
- delete worker_message.getWorkOrder();
-
- // Convert the measured timestamps to epoch times in microseconds.
- const uint64_t execution_start_time =
- std::chrono::duration_cast<std::chrono::microseconds>(
- start.time_since_epoch()).count();
- const uint64_t execution_end_time =
- std::chrono::duration_cast<std::chrono::microseconds>(
- end.time_since_epoch()).count();
-
- // Construct the proto message.
- proto->set_work_order_type(is_rebuild_work_order ? WorkOrderCompletionMessage::REBUILD
- : WorkOrderCompletionMessage::NORMAL);
- proto->set_operator_index(worker_message.getRelationalOpIndex());
- proto->set_query_id(query_id_for_workorder);
- proto->set_worker_thread_index(worker_thread_index_);
- proto->set_execution_start_time(execution_start_time);
- proto->set_execution_end_time(execution_end_time);
-
-#ifdef QUICKSTEP_DISTRIBUTED
- proto->set_shiftboss_index(shiftboss_index_);
-#endif // QUICKSTEP_DISTRIBUTED
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
deleted file mode 100644
index feb7c7d..0000000
--- a/query_execution/Worker.hpp
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_
-
-#include <cstddef>
-#include <cstdint>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
-#include "threading/Thread.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-
-namespace tmb { class TaggedMessge; }
-
-namespace quickstep {
-
-namespace serialization { class WorkOrderCompletionMessage; }
-
-/** \addtogroup QueryExecution
- * @{
- */
-
- /**
- * @brief A worker thread executes WorkOrders which it receives from Foreman.
- **/
-class Worker : public Thread {
- public:
- /**
- * @brief Constructor
- *
- * @param worker_thread_index The unique index of this worker thread in
- * WorkerDirectory.
- * @param bus A pointer to the TMB.
- * @param cpu_id The ID of the CPU to which this worker thread can be pinned.
- *
- * @note If cpu_id is not specified, worker thread can be possibly moved
- * around on different CPUs by the OS.
- **/
- Worker(const std::size_t worker_thread_index,
- MessageBus *bus,
- int cpu_id = -1)
- : worker_thread_index_(worker_thread_index),
- bus_(bus),
- cpu_id_(cpu_id) {
- DEBUG_ASSERT(bus_ != nullptr);
- worker_client_id_ = bus_->Connect();
-
- bus_->RegisterClientAsSender(worker_client_id_, kWorkOrderCompleteMessage);
- bus_->RegisterClientAsSender(worker_client_id_,
- kRebuildWorkOrderCompleteMessage);
- bus_->RegisterClientAsSender(worker_client_id_, kCatalogRelationNewBlockMessage);
- bus_->RegisterClientAsSender(worker_client_id_, kDataPipelineMessage);
- bus_->RegisterClientAsSender(worker_client_id_, kWorkOrderFeedbackMessage);
-
- bus_->RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);
- bus_->RegisterClientAsReceiver(worker_client_id_, kRebuildWorkOrderMessage);
- bus_->RegisterClientAsReceiver(worker_client_id_, kPoisonMessage);
-
-#ifdef QUICKSTEP_DISTRIBUTED
- bus_->RegisterClientAsReceiver(worker_client_id_, kShiftbossRegistrationResponseMessage);
-#endif // QUICKSTEP_DISTRIBUTED
- }
-
- ~Worker() override {}
-
- /**
- * @brief Get the TMB client ID of this worker.
- *
- * @return TMB client ID of this worker.
- **/
- inline client_id getBusClientID() const {
- return worker_client_id_;
- }
-
- protected:
- /**
- * @brief Receive workorder one at a time and execute it. After execution, send
- * a completion message to the Foreman and delete the WorkOrder.
- *
- * @note On receiving a poison message, the method terminates.
- **/
- void run() override;
-
- private:
- /**
- * @brief A helper method to execute the WorkOrder and construct a
- * completion message.
- *
- * @note Right now a single helper method works for all message types.
- * If different message types need to collect different statistics for
- * the WorkOrder execution, we need to create different helper methods,
- * one for each message type.
- *
- * @param tagged_message The TaggedMessage which consists of the WorkOrder.
- * @param proto The proto message to be sent.
- * @param is_rebuild_work_order Whether it is used for a RebuildWorkOrder.
- **/
- void executeWorkOrderHelper(const TaggedMessage &tagged_message,
- serialization::WorkOrderCompletionMessage *proto,
- const bool is_rebuild_work_order = false);
-
- /**
- * @brief A helper method to send the WorkOrder completion message.
- *
- * @param receiver The TMB client ID of the receiver.
- * @param proto The proto message to be sent.
- * @param message_type The ID of the type of the message being sent.
- **/
- void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
- const serialization::WorkOrderCompletionMessage &proto,
- const message_type_id message_type);
-
- const std::size_t worker_thread_index_;
- MessageBus *bus_;
-
- const int cpu_id_;
- client_id worker_client_id_;
-
-#ifdef QUICKSTEP_DISTRIBUTED
- std::size_t shiftboss_index_;
-#endif // QUICKSTEP_DISTRIBUTED
-
- DISALLOW_COPY_AND_ASSIGN(Worker);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkerDirectory.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerDirectory.hpp b/query_execution/WorkerDirectory.hpp
deleted file mode 100644
index be5cfc1..0000000
--- a/query_execution/WorkerDirectory.hpp
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
-
-#include <algorithm>
-#include <cstddef>
-#include <iterator>
-#include <utility>
-#include <vector>
-
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "utility/Macros.hpp"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- * @{
- */
-/**
- * @brief A class which keeps the metadata about the workers.
- *
- * @note The number of workorders per worker used in this class are as viewed by
- * Foreman. An alternative implementation can use TMB to query the queue
- * lengths of each worker.
- *
- * @note This class is intended to be used only by Foreman thread. Therefore
- * none of the methods in this class are thread-safe.
- **/
-class WorkerDirectory {
- public:
- /**
- * @brief Constructor.
- *
- * @param num_workers The number of workers.
- * @param client_ids A vector of TMB client IDs for the workers.
- * @param numa_node_ids A vector of NUMA node IDs where the workers are
- * pinned. If a worker is not pinned to any NUMA node, the NUMA node ID
- * for that worker is -1.
- **/
- WorkerDirectory(const std::size_t num_workers,
- const std::vector<client_id> &client_ids,
- const std::vector<int> &numa_node_ids)
- : num_workers_(num_workers),
- num_queued_workorders_(num_workers, 0),
- numa_node_ids_(numa_node_ids),
- client_ids_(client_ids) {
- DEBUG_ASSERT(num_workers > 0);
- DEBUG_ASSERT(client_ids.size() == num_workers);
- DEBUG_ASSERT(numa_node_ids.size() == num_workers);
- }
-
- /**
- * @brief Get the number of worker threads.
- *
- * @return The number of worker threads at the time the function is called.
- **/
- inline const std::size_t getNumWorkers() const {
- return num_workers_;
- }
-
- /**
- * @brief Get the number of queued workorders for the given worker.
- *
- * @note The queued number of workorders consist of the workorders waiting for
- * execution and the workorder being executed by the specified worker
- * at the time this function is called.
- *
- * @param worker_thread_index The logical ID of the given worker.
- *
- * @return The number of queued workorders.
- **/
- inline const std::size_t getNumQueuedWorkOrders(
- const std::size_t worker_thread_index) const {
- DEBUG_ASSERT(worker_thread_index < num_workers_);
- return num_queued_workorders_[worker_thread_index];
- }
-
- /**
- * @brief Increment the number of queued workorders for the given worker by 1.
- *
- * @param worker_thread_index The logical ID of the given worker.
- **/
- inline void incrementNumQueuedWorkOrders(const std::size_t worker_thread_index) {
- DEBUG_ASSERT(worker_thread_index < num_workers_);
- ++num_queued_workorders_[worker_thread_index];
- }
-
- /**
- * @brief Decrement the number of queued workorders for the given worker by 1.
- *
- * @param worker_thread_index The logical ID of the given worker.
- **/
- inline void decrementNumQueuedWorkOrders(const std::size_t worker_thread_index) {
- DEBUG_ASSERT(worker_thread_index < num_workers_);
- DEBUG_ASSERT(num_queued_workorders_[worker_thread_index] >= 1);
- --num_queued_workorders_[worker_thread_index];
- }
-
- /**
- * @brief Get the NUMA node where the specified worker is pinned to.
- *
- * @param worker_thread_index The logical ID of the given worker.
- *
- * @return The NUMA node ID where the given worker is pinned. If the worker
- * hasn't been pinned to any NUMA node, this value is -1.
- **/
- inline int getNUMANode(const std::size_t worker_thread_index) const {
- DEBUG_ASSERT(worker_thread_index < num_workers_);
- return numa_node_ids_[worker_thread_index];
- }
-
- /**
- * @brief Get the TMB client ID of the specified worker.
- *
- * @param worker_thread_index The logical ID of the given worker.
- *
- * @return The TMB client ID of the given worker.
- **/
- inline const client_id getClientID(const std::size_t worker_thread_index) const {
- DEBUG_ASSERT(worker_thread_index < num_workers_);
- return client_ids_[worker_thread_index];
- }
-
- /**
- * @brief Generate address of a worker.
- *
- * @param worker_thread_index The logical ID of the given worker.
- *
- * @return TMB Address of the given worker.
- **/
- inline Address getWorkerAddress(std::size_t worker_thread_index) const {
- DEBUG_ASSERT(worker_thread_index < num_workers_);
- Address worker_address;
- worker_address.AddRecipient(client_ids_[worker_thread_index]);
- return worker_address;
- }
-
- /**
- * @brief Add metadata about a new worker.
- *
- * @param cid The TMB client ID of the new worker.
- * @param numa_node_id The NUMA node ID where the new worker is pinned.
- **/
- inline void addWorker(const client_id cid, const int numa_node_id) {
- ++num_workers_;
- num_queued_workorders_.push_back(0);
- numa_node_ids_.push_back(numa_node_id);
- client_ids_.push_back(cid);
- }
-
- /**
- * @brief Get information about the least loaded worker at the time the
- * function is called.
- *
- * TODO(harshad) This method performs a linear search. Using a dynamic
- * priority queue to track worker loads can help this method
- * execute faster.
- * TODO(harshad) Extend this method to find least loaded workers on a given
- * NUMA node.
- *
- * @note If there are multiple workers with the least amount of load, the
- * worker with the smaller logical ID is returned.
- *
- * @return A pair: First element is the logical ID of the least loaded worker
- * and second element is the number of queued workorders for this
- * worker at the time the function is called.
- **/
- std::pair<std::size_t, std::size_t> getLeastLoadedWorker() const {
- std::vector<std::size_t>::const_iterator min_element_iter =
- std::min_element(std::begin(num_queued_workorders_),
- std::end(num_queued_workorders_));
- DEBUG_ASSERT(min_element_iter != num_queued_workorders_.end());
- const std::size_t least_loaded_worker_thread_index =
- std::distance(num_queued_workorders_.begin(), min_element_iter);
- return std::make_pair(least_loaded_worker_thread_index, *min_element_iter);
- }
-
- /**
- * @brief Get information about the most loaded worker at the time the
- * function is called.
- *
- * TODO(harshad) This method performs a linear search. Using a dynamic
- * priority queue to track worker loads can help this method
- * execute faster.
- * TODO(harshad) Extend this method to find most loaded workers on a given
- * NUMA node.
- *
- * @note If there are multiple workers with the maximum amount of load, the
- * worker with the smaller logical ID is returned.
- *
- * @return A pair: First element is the logical ID of the most loaded worker
- * and second element is the number of queued workorders for this
- * worker at the time the function is called.
- **/
- std::pair<std::size_t, std::size_t> getMostLoadedWorker() const {
- std::vector<std::size_t>::const_iterator max_element_iter =
- std::max_element(std::begin(num_queued_workorders_),
- std::end(num_queued_workorders_));
- DEBUG_ASSERT(max_element_iter != num_queued_workorders_.end());
- const std::size_t most_loaded_worker_thread_index =
- std::distance(num_queued_workorders_.begin(), max_element_iter);
- return std::make_pair(most_loaded_worker_thread_index, *max_element_iter);
- }
-
- private:
- std::size_t num_workers_;
-
- // The number of WorkOrders queued for execution per worker.
- std::vector<std::size_t> num_queued_workorders_;
-
- // The NUMA node IDs where the workers are pinned to. If a worker is not
- // pinned, the ID is -1.
- std::vector<int> numa_node_ids_;
-
- // The vector of client IDs
- std::vector<client_id> client_ids_;
-
- DISALLOW_COPY_AND_ASSIGN(WorkerDirectory);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkerMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp
deleted file mode 100644
index bd84ac6..0000000
--- a/query_execution/WorkerMessage.hpp
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_MESSAGE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKER_MESSAGE_HPP_
-
-#include <cstddef>
-
-namespace quickstep {
-
-class WorkOrder;
-/**
- * @brief The messages to be sent to the worker from Foreman.
- *
- * @note This class is copyable.
- **/
-class WorkerMessage {
- public:
- static constexpr int kInvalidRecipientIndexHint = -1;
-
- enum class WorkerMessageType {
- kRebuildWorkOrder = 0,
- kWorkOrder,
- };
-
- /**
- * @brief A static factory method for generating rebuild WorkOrder messages.
- *
- * @param rebuild_workorder The rebuild WorkOrder to be executed by the worker.
- * @param relational_op_index The index of the relational operator in the
- * query plan DAG that generated the given rebuild WorkOrder.
- *
- * @return The constructed RebuildWorkOrderMessage.
- **/
- static WorkerMessage* RebuildWorkOrderMessage(WorkOrder *rebuild_workorder, const std::size_t relational_op_index) {
- return new WorkerMessage(rebuild_workorder,
- relational_op_index,
- WorkerMessageType::kRebuildWorkOrder);
- }
-
- /**
- * @brief A static factory method for generating WorkOrder messages.
- *
- * @param workorder The work order to be executed by the worker.
- * @param relational_op_index The index of the relational operator in the
- * query plan DAG that generated the given
- * workorder.
- *
- * @return The constructed WorkOrderMessage.
- **/
- static WorkerMessage* WorkOrderMessage(WorkOrder *workorder, const std::size_t relational_op_index) {
- return new WorkerMessage(workorder,
- relational_op_index,
- WorkerMessageType::kWorkOrder);
- }
-
- /**
- * @brief Destructor.
- **/
- ~WorkerMessage() {
- }
-
- /**
- * @brief Gets the work order to be executed by the worker.
- * @return A pointer to the work order which the worker should execute.
- **/
- inline WorkOrder* getWorkOrder() const {
- return work_unit_;
- }
-
- /**
- * @brief Get the index of the relational operator in the DAG that generated
- * the workorder.
- **/
- inline std::size_t getRelationalOpIndex() const {
- return relational_op_index_;
- }
-
- /**
- * @brief Get the type of this WorkerMessage.
- **/
- inline WorkerMessageType getType() const {
- return type_;
- }
-
- /**
- * @brief Set a hint for the recipient worker thread.
- *
- * @param recipient_index_hint The hint i.e. the worker thread index.
- **/
- inline void setRecipientHint(const int recipient_index_hint) {
- recipient_index_hint_ = recipient_index_hint;
- }
-
- /**
- * @brief Get the hint for the recipient worker thread. The hint is invalid if
- * it is kInvalidRecipientIndexHint.
- **/
- inline int getRecipientHint() const {
- return recipient_index_hint_;
- }
-
- private:
- /**
- * @brief Constructor.
- *
- * @param work_unit The work order to be executed by the worker.
- * @param relational_op_index The index of the relational operator in the
- * query plan DAG that generated the given WorkOrder.
- * @param type Type of the WorkerMessage.
- **/
- WorkerMessage(WorkOrder *work_unit,
- const std::size_t relational_op_index,
- const WorkerMessageType type)
- : work_unit_(work_unit),
- relational_op_index_(relational_op_index),
- type_(type),
- recipient_index_hint_(kInvalidRecipientIndexHint) {}
-
- WorkOrder *work_unit_;
- const std::size_t relational_op_index_;
- const WorkerMessageType type_;
- int recipient_index_hint_;
-};
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_WORKER_MESSAGE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/WorkerSelectionPolicy.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerSelectionPolicy.hpp b/query_execution/WorkerSelectionPolicy.hpp
deleted file mode 100644
index 4f4c6eb..0000000
--- a/query_execution/WorkerSelectionPolicy.hpp
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_SELECTION_POLICY_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_WORKER_SELECTION_POLICY_HPP_
-
-#include <cstddef>
-#include <random>
-#include <utility>
-
-#include "query_execution/WorkerDirectory.hpp"
-#include "utility/Macros.hpp"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief Base class for a policy to select workers for workorder execution.
- **/
-class WorkerSelectionPolicy {
- public:
- /**
- * @brief Constructor.
- *
- * @param worker_directory The worker directory.
- **/
- explicit WorkerSelectionPolicy(const WorkerDirectory &worker_directory)
- : worker_directory_(worker_directory) {
- if (worker_directory.getNumWorkers() == 0) {
- FATAL_ERROR("WorkerSelectionPolicy needs at least one worker");
- }
- }
-
- /**
- * @brief Choose the worker for next workorder execution based on the policy.
- *
- * @return The logical ID of the worker chosen for next workorder execution.
- **/
- virtual std::size_t getNextWorkerID() = 0;
-
- protected:
- const WorkerDirectory &worker_directory_;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(WorkerSelectionPolicy);
-};
-
-/**
- * @brief Choose the next worker in a round robin manner.
- **/
-class RoundRobinWorkerSelectionPolicy : public WorkerSelectionPolicy {
- public:
- /**
- * @brief Constructor.
- *
- * @param worker_directory The worker directory.
- * @param start_worker_thread_index The ID of the worker which should be chosen
- * initially.
- *
- * @warning If the number of workers change between the constructor and
- * getNextWorkerID() invocations, the worker ID returned by
- * getNextWorkerID() initially may not be the same as desired.
- **/
- RoundRobinWorkerSelectionPolicy(const WorkerDirectory &worker_directory,
- const std::size_t start_worker_thread_index)
- : WorkerSelectionPolicy(worker_directory) {
- if (start_worker_thread_index == 0) {
- previous_worker_thread_index_ = worker_directory.getNumWorkers() - 1;
- } else {
- previous_worker_thread_index_ = start_worker_thread_index - 1;
- }
- }
-
- std::size_t getNextWorkerID() override {
- previous_worker_thread_index_ =
- (previous_worker_thread_index_ + 1) % worker_directory_.getNumWorkers();
- return previous_worker_thread_index_;
- }
-
- private:
- std::size_t previous_worker_thread_index_;
-
- DISALLOW_COPY_AND_ASSIGN(RoundRobinWorkerSelectionPolicy);
-};
-
-/**
- * @brief Choose the next worker so that the load among workers is balanced.
- **/
-class LoadBalancingWorkerSelectionPolicy : public WorkerSelectionPolicy {
- public:
- explicit LoadBalancingWorkerSelectionPolicy(
- const WorkerDirectory &worker_directory)
- : WorkerSelectionPolicy(worker_directory) {}
-
- std::size_t getNextWorkerID() override {
- return worker_directory_.getLeastLoadedWorker().first;
- }
-
- private:
- DISALLOW_COPY_AND_ASSIGN(LoadBalancingWorkerSelectionPolicy);
-};
-
-/**
- * @brief Choose a (uniform) random worker.
- **/
-class RandomWorkerSelectionPolicy : public WorkerSelectionPolicy {
- public:
- explicit RandomWorkerSelectionPolicy(const WorkerDirectory &worker_directory)
- : WorkerSelectionPolicy(worker_directory),
- mt_(std::random_device()()) {}
-
- std::size_t getNextWorkerID() override {
- std::uniform_int_distribution<std::size_t> dist(
- 0, worker_directory_.getNumWorkers() - 1);
- return dist(mt_);
- }
-
- private:
- std::mt19937_64 mt_;
-
- DISALLOW_COPY_AND_ASSIGN(RandomWorkerSelectionPolicy);
-};
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_WORKER_SELECTION_POLICY_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
deleted file mode 100644
index 32437c3..0000000
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include <cstdlib>
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogAttribute.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "query_execution/BlockLocator.hpp"
-#include "query_execution/BlockLocatorUtil.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
-#include "storage/StorageManager.hpp"
-#include "types/TypeFactory.hpp"
-#include "types/TypeID.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-#include "gtest/gtest.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::free;
-using std::malloc;
-using std::move;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::AnnotatedMessage;
-using tmb::MessageBus;
-using tmb::TaggedMessage;
-
-namespace quickstep {
-
-class BlockLocatorTest : public ::testing::Test {
- protected:
- static const char kStoragePath[];
- static const char kDomainNetworkAddress[];
-
- ~BlockLocatorTest() {
- locator_->join();
- }
-
- virtual void SetUp() {
- bus_.Initialize();
-
- locator_.reset(new BlockLocator(&bus_));
- locator_->start();
-
- worker_client_id_ = bus_.Connect();
-
- bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
- bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
-
- bus_.RegisterClientAsSender(worker_client_id_, kLocateBlockMessage);
- bus_.RegisterClientAsReceiver(worker_client_id_, kLocateBlockResponseMessage);
-
- bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
-
- block_domain_ =
- block_locator::getBlockDomain(kDomainNetworkAddress, worker_client_id_, &locator_client_id_, &bus_);
- DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
-
- storage_manager_.reset(
- new StorageManager(kStoragePath, block_domain_, locator_client_id_, &bus_));
- }
-
- virtual void TearDown() {
- storage_manager_.reset();
-
- TaggedMessage message(kPoisonMessage);
-
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') sent PoisonMessage (typed '" << kPoisonMessage
- << "') to BlockLocator (id '" << locator_client_id_ << "')";
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_,
- worker_client_id_,
- locator_client_id_,
- move(message)));
- }
-
- vector<block_id_domain> getPeerDomains(const block_id block) {
- serialization::BlockMessage proto;
- proto.set_block_id(block);
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kLocateBlockMessage);
- free(proto_bytes);
-
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') sent LocateBlockMessage (typed '" << kLocateBlockMessage
- << "') to BlockLocator";
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_,
- worker_client_id_,
- locator_client_id_,
- move(message)));
-
- const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
- CHECK_EQ(kLocateBlockResponseMessage, tagged_message.message_type());
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') received LocateBlockResponseMessage from BlockLocator";
-
- serialization::LocateBlockResponseMessage response_proto;
- CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- vector<block_id_domain> domains;
- for (int i = 0; i < response_proto.block_domains_size(); ++i) {
- domains.push_back(response_proto.block_domains(i));
- }
-
- return domains;
- }
-
- void checkLoaded(const block_id block) {
- const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
- EXPECT_EQ(1u, peer_domain_network_addresses.size());
- EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data());
-
- const vector<block_id_domain> domains = getPeerDomains(block);
- EXPECT_EQ(1u, domains.size());
- EXPECT_EQ(block_domain_, domains[0]);
- }
-
- void checkEvicted(const block_id block) {
- const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
- EXPECT_TRUE(peer_domain_network_addresses.empty());
-
- const vector<block_id_domain> domains = getPeerDomains(block);
- EXPECT_TRUE(domains.empty());
- }
-
- tmb::client_id worker_client_id_;
-
- block_id_domain block_domain_;
- unique_ptr<StorageManager> storage_manager_;
-
- private:
- MessageBusImpl bus_;
-
- unique_ptr<BlockLocator> locator_;
- tmb::client_id locator_client_id_;
-};
-
-const char BlockLocatorTest::kStoragePath[] = "./block_locator_test_data/";
-const char BlockLocatorTest::kDomainNetworkAddress[] = "ip:port";
-
-TEST_F(BlockLocatorTest, BlockTest) {
- CatalogRelation relation(nullptr, "rel");
- relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt)));
-
- const block_id block =
- storage_manager_->createBlock(relation, relation.getDefaultStorageBlockLayout());
- checkLoaded(block);
-
- ASSERT_TRUE(storage_manager_->saveBlockOrBlob(block));
- storage_manager_->evictBlockOrBlob(block);
- checkEvicted(block);
-
- {
- const BlockReference block_ref = storage_manager_->getBlock(block, relation);
- }
- checkLoaded(block);
-
- storage_manager_->deleteBlockOrBlobFile(block);
- checkEvicted(block);
-}
-
-TEST_F(BlockLocatorTest, BlobTest) {
- const block_id blob = storage_manager_->createBlob(kDefaultBlockSizeInSlots);
- checkLoaded(blob);
-
- ASSERT_TRUE(storage_manager_->saveBlockOrBlob(blob));
- storage_manager_->evictBlockOrBlob(blob);
- checkEvicted(blob);
-
- {
- const BlobReference blob_ref = storage_manager_->getBlob(blob);
- }
- checkLoaded(blob);
-
- storage_manager_->deleteBlockOrBlobFile(blob);
- checkEvicted(blob);
-}
-
-} // namespace quickstep
-
-int main(int argc, char **argv) {
- google::InitGoogleLogging(argv[0]);
- // Honor FLAGS_buffer_pool_slots in StorageManager.
- gflags::ParseCommandLineFlags(&argc, &argv, true);
-
- ::testing::InitGoogleTest(&argc, argv);
-
- return RUN_ALL_TESTS();
-}