You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sp...@apache.org on 2016/12/11 17:45:41 UTC
[18/51] [abbrv] [partial] incubator-quickstep git commit: remove c++
files
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
deleted file mode 100644
index fb9a9d6..0000000
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
-
-#include <cstddef>
-#include <unordered_map>
-#include <vector>
-
-#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED
-#include "threading/ThreadIDBasedMap.hpp"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_style.h"
-#include "tmb/pure_memory_message_bus.h"
-#include "tmb/tagged_message.h"
-
-namespace quickstep {
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-typedef tmb::Address Address;
-typedef tmb::AnnotatedMessage AnnotatedMessage;
-typedef tmb::MessageBus MessageBus;
-typedef tmb::MessageStyle MessageStyle;
-typedef tmb::Priority Priority;
-typedef tmb::PureMemoryMessageBus<false> MessageBusImpl;
-typedef tmb::TaggedMessage TaggedMessage;
-typedef tmb::client_id client_id;
-typedef tmb::message_type_id message_type_id;
-
-using ClientIDMap = ThreadIDBasedMap<client_id,
- 'C',
- 'l',
- 'i',
- 'e',
- 'n',
- 't',
- 'I',
- 'D',
- 'M',
- 'a',
- 'p'>;
-
-// We sort the following message types in the order of a life cycle of a query.
-enum QueryExecutionMessageType : message_type_id {
- kAdmitRequestMessage = 0, // Requesting a query (or queries) to be admitted, from
- // the main thread to Foreman.
- kWorkOrderMessage, // From Foreman to Worker.
- kWorkOrderCompleteMessage, // From Worker to Foreman.
- kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman.
- kDataPipelineMessage, // From InsertDestination or some WorkOrders to Foreman.
- kWorkOrderFeedbackMessage, // From some WorkOrders to Foreman on behalf of
- // their corresponding RelationalOperators.
- kRebuildWorkOrderMessage, // From Foreman to Worker.
- kRebuildWorkOrderCompleteMessage, // From Worker to Foreman.
- kWorkloadCompletionMessage, // From Foreman to main thread.
- kPoisonMessage, // From the main thread to Foreman and Workers.
-
-#ifdef QUICKSTEP_DISTRIBUTED
- kShiftbossRegistrationMessage, // From Shiftboss to Foreman.
- kShiftbossRegistrationResponseMessage, // From Foreman to Shiftboss, or from
- // Shiftboss to Worker.
- kQueryInitiateMessage, // From Foreman to Shiftboss.
- kQueryInitiateResponseMessage, // From Shiftboss to Foreman.
-
- kInitiateRebuildMessage, // From Foreman to Shiftboss.
- kInitiateRebuildResponseMessage, // From Shiftboss to Foreman.
-
- kQueryTeardownMessage, // From Foreman to Shiftboss.
-
- kSaveQueryResultMessage, // From Foreman to Shiftboss.
- kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
-
- // From Foreman to CLI.
- kQueryExecutionSuccessMessage,
-
- // BlockLocator related messages, sorted in a life cycle of StorageManager
- // with a unique block domain.
- kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
- kBlockDomainRegistrationResponseMessage, // From BlockLocator to Worker.
- kAddBlockLocationMessage, // From StorageManager to BlockLocator.
- kDeleteBlockLocationMessage, // From StorageManager to BlockLocator.
- kLocateBlockMessage, // From StorageManager to BlockLocator.
- kLocateBlockResponseMessage, // From BlockLocator to StorageManager.
- kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator.
- kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager.
- kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator.
-#endif
-};
-
-// WorkOrder profiling data structures.
-// Profiling record for an individual work order.
-struct WorkOrderTimeEntry {
- std::size_t worker_id;
- std::size_t operator_id;
- std::size_t start_time; // Epoch time measured in microseconds
- std::size_t end_time; // Epoch time measured in microseconds
-};
-// Key = query ID.
-// Value = vector of work order profiling records.
-typedef std::unordered_map<std::size_t, std::vector<WorkOrderTimeEntry>> WorkOrderTimeRecorder;
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
deleted file mode 100644
index b41965c..0000000
--- a/query_execution/QueryExecutionUtil.hpp
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <utility>
-
-#include "query_execution/AdmitRequestMessage.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/message_style.h"
-#include "tmb/tagged_message.h"
-
-namespace quickstep {
-
-class QueryHandle;
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A static class for reusable methods in query_execution module.
- **/
-class QueryExecutionUtil {
- public:
- /**
- * @brief Send a TMB message to a single receiver.
- *
- * @param bus A pointer to the TMB.
- * @param sender_id The client ID of the sender.
- * @param receiver_id The client ID of the receiver.
- * @param tagged_message A moved-from reference to the tagged message.
- *
- * @return A status code indicating the result of the message getting sent.
- * The caller should ensure that the status is SendStatus::kOK.
- **/
- static tmb::MessageBus::SendStatus SendTMBMessage(
- tmb::MessageBus *bus,
- tmb::client_id sender_id,
- tmb::client_id receiver_id,
- tmb::TaggedMessage &&tagged_message) { // NOLINT(whitespace/operators)
- tmb::Address receiver_address;
- receiver_address.AddRecipient(receiver_id);
-
- tmb::MessageStyle single_receiver_style;
- return bus->Send(sender_id,
- receiver_address,
- single_receiver_style,
- std::move(tagged_message));
- }
-
- /**
- * @brief Construct and send an AdmitRequestMessage from a given sender to a
- * given recipient.
- *
- * @param sender_id The TMB client ID of the sender.
- * @param receiver_id The TMB client ID of the receiver.
- * @param query_handle The QueryHandle used in the AdmitRequestMessage.
- * @param bus A pointer to the TMB.
- * @param tagged_message A moved from reference to the tagged message.
- *
- * @return A status code indicating the result of the message delivery.
- * The caller should ensure that the status is SendStatus::kOK.
- **/
- static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
- const tmb::client_id sender_id,
- const tmb::client_id receiver_id,
- QueryHandle *query_handle,
- tmb::MessageBus *bus) {
- std::unique_ptr<AdmitRequestMessage> request_message(
- new AdmitRequestMessage(query_handle));
- const std::size_t size_of_request_msg = sizeof(*request_message);
- tmb::TaggedMessage admit_tagged_message(
- request_message.release(), size_of_request_msg, kAdmitRequestMessage);
-
- return QueryExecutionUtil::SendTMBMessage(
- bus, sender_id, receiver_id, std::move(admit_tagged_message));
- }
-
- /**
- * @brief Receive a query completion message.
- *
- * @param receiver_id The TMB client ID of the receiver thread.
- * @param bus A pointer to the TMB.
- *
- * @note Right now the query completion message is of no interest to the
- * caller. In the future, if this message needs to be fetched, make this
- * function return the TaggedMessage.
- **/
- static void ReceiveQueryCompletionMessage(const tmb::client_id receiver_id,
- tmb::MessageBus *bus) {
- const tmb::AnnotatedMessage annotated_msg =
- bus->Receive(receiver_id, 0, true);
- const tmb::TaggedMessage &tagged_message = annotated_msg.tagged_message;
- DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
- }
-
- static void BroadcastMessage(const tmb::client_id sender_id,
- const tmb::Address &addresses,
- tmb::TaggedMessage &&tagged_message, // NOLINT(whitespace/operators)
- tmb::MessageBus *bus) {
- // The sender broadcasts the given message to all 'addresses'.
- tmb::MessageStyle style;
- style.Broadcast(true);
-
- const tmb::MessageBus::SendStatus send_status =
- bus->Send(sender_id, addresses, style, std::move(tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
- }
-
- static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
- // Terminate all threads.
- // The sender thread broadcasts poison message to the workers and foreman.
- // Each worker dies after receiving poison message. The order of workers'
- // death is irrelavant.
- tmb::MessageStyle style;
- style.Broadcast(true);
- tmb::Address address;
- address.All(true);
- tmb::TaggedMessage poison_tagged_message(kPoisonMessage);
-
- DLOG(INFO) << "TMB client ID " << sender_id
- << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all";
- const tmb::MessageBus::SendStatus send_status = bus->Send(
- sender_id, address, style, std::move(poison_tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
- }
-
- private:
- /**
- * @brief Constructor. Made private to avoid instantiation.
- **/
- QueryExecutionUtil();
-
- DISALLOW_COPY_AND_ASSIGN(QueryExecutionUtil);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
deleted file mode 100644
index 8e37da8..0000000
--- a/query_execution/QueryManagerBase.cpp
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/QueryManagerBase.hpp"
-
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "query_optimizer/QueryPlan.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "glog/logging.h"
-
-using std::pair;
-
-namespace quickstep {
-
-QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
- : query_handle_(DCHECK_NOTNULL(query_handle)),
- query_id_(query_handle->query_id()),
- query_dag_(DCHECK_NOTNULL(
- DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
- num_operators_in_dag_(query_dag_->size()),
- output_consumers_(num_operators_in_dag_),
- blocking_dependencies_(num_operators_in_dag_),
- query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) {
- for (dag_node_index node_index = 0;
- node_index < num_operators_in_dag_;
- ++node_index) {
- const QueryContext::insert_destination_id insert_destination_index =
- query_dag_->getNodePayload(node_index).getInsertDestinationID();
- if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
- // Rebuild is necessary whenever InsertDestination is present.
- query_exec_state_->setRebuildRequired(node_index);
- query_exec_state_->setRebuildStatus(node_index, 0, false);
- }
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(node_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
- // The link is not a pipeline-breaker. Streaming of blocks is possible
- // between these two operators.
- output_consumers_[node_index].push_back(dependent_op_index);
- } else {
- // The link is a pipeline-breaker. Streaming of blocks is not possible
- // between these two operators.
- blocking_dependencies_[dependent_op_index].push_back(node_index);
- }
- }
- }
-}
-
-QueryManagerBase::QueryStatusCode QueryManagerBase::queryStatus(
- const dag_node_index op_index) {
- // As kQueryExecuted takes precedence over kOperatorExecuted, we first check
- // whether the query has finished its execution.
- if (query_exec_state_->hasQueryExecutionFinished()) {
- return QueryStatusCode::kQueryExecuted;
- }
-
- if (query_exec_state_->hasExecutionFinished(op_index)) {
- return QueryStatusCode::kOperatorExecuted;
- }
-
- return QueryStatusCode::kNone;
-}
-
-void QueryManagerBase::processFeedbackMessage(
- const dag_node_index op_index, const WorkOrder::FeedbackMessage &msg) {
- RelationalOperator *op =
- query_dag_->getNodePayloadMutable(op_index);
- op->receiveFeedbackMessage(msg);
-}
-
-void QueryManagerBase::processWorkOrderCompleteMessage(
- const dag_node_index op_index) {
- query_exec_state_->decrementNumQueuedWorkOrders(op_index);
-
- // Check if new work orders are available and fetch them if so.
- fetchNormalWorkOrders(op_index);
-
- if (checkRebuildRequired(op_index)) {
- if (checkNormalExecutionOver(op_index)) {
- if (!checkRebuildInitiated(op_index)) {
- if (initiateRebuild(op_index)) {
- // Rebuild initiated and completed right away.
- markOperatorFinished(op_index);
- } else {
- // Rebuild under progress.
- }
- } else if (checkRebuildOver(op_index)) {
- // Rebuild was under progress and now it is over.
- markOperatorFinished(op_index);
- }
- } else {
- // Normal execution under progress for this operator.
- }
- } else if (checkOperatorExecutionOver(op_index)) {
- // Rebuild not required for this operator and its normal execution is
- // complete.
- markOperatorFinished(op_index);
- }
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- // Process the dependent operator (of the operator whose WorkOrder
- // was just executed) for which all the dependencies have been met.
- processOperator(dependent_op_index, true);
- }
- }
-}
-
-void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index) {
- query_exec_state_->decrementNumRebuildWorkOrders(op_index);
-
- if (checkRebuildOver(op_index)) {
- markOperatorFinished(op_index);
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
- }
- }
- }
-}
-
-void QueryManagerBase::processOperator(const dag_node_index index,
- const bool recursively_check_dependents) {
- if (fetchNormalWorkOrders(index)) {
- // Fetched work orders. Return to wait for the generated work orders to
- // execute, and skip the execution-finished checks.
- return;
- }
-
- if (checkNormalExecutionOver(index)) {
- if (checkRebuildRequired(index)) {
- if (!checkRebuildInitiated(index)) {
- // Rebuild hasn't started, initiate it.
- if (initiateRebuild(index)) {
- // Rebuild initiated and completed right away.
- markOperatorFinished(index);
- } else {
- // Rebuild WorkOrders have been generated.
- return;
- }
- } else if (checkRebuildOver(index)) {
- // Rebuild had been initiated and it is over.
- markOperatorFinished(index);
- }
- } else {
- // Rebuild is not required and normal execution over, mark finished.
- markOperatorFinished(index);
- }
- // If we reach here, that means the operator has been marked as finished.
- if (recursively_check_dependents) {
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
- }
- }
- }
- }
-}
-
-void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
- const block_id block,
- const relation_id rel_id) {
- for (const dag_node_index consumer_index :
- output_consumers_[op_index]) {
- // Feed the streamed block to the consumer. Note that 'output_consumers_'
- // only contain those dependents of operator with index = op_index which are
- // eligible to receive streamed input.
- query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
- // Because of the streamed input just fed, check if there are any new
- // WorkOrders available and if so, fetch them.
- fetchNormalWorkOrders(consumer_index);
- }
-}
-
-void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
- query_exec_state_->setExecutionFinished(index);
-
- RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
- op->updateCatalogOnCompletion();
-
- const relation_id output_rel = op->getOutputRelationID();
- for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
- // Signal dependent operator that current operator is done feeding input blocks.
- if (output_rel >= 0) {
- dependent_op->doneFeedingInputBlocks(output_rel);
- }
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- dependent_op->informAllBlockingDependenciesMet();
- }
- }
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
deleted file mode 100644
index a274742..0000000
--- a/query_execution/QueryManagerBase.hpp
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryExecutionState.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "utility/DAG.hpp"
-#include "utility/Macros.hpp"
-
-namespace quickstep {
-
-class QueryHandle;
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A base class that manages the execution of a query including
- * generation of new work orders, and keeping track of the query
- * exection state.
- **/
-class QueryManagerBase {
- public:
- typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
-
- /**
- * @brief Return codes for queryStatus() function.
- *
- * @note When both operator and query get executed, kQueryExecuted takes
- * precedence over kOperatorExecuted.
- **/
- enum class QueryStatusCode {
- kOperatorExecuted = 0, // An operator in the query finished execution.
- kQueryExecuted, // The query got executed.
- kNone // None of the above.
- };
-
- /**
- * @brief Constructor.
- *
- * @param query_handle The QueryHandle object for this query.
- **/
- explicit QueryManagerBase(QueryHandle *query_handle);
-
- /**
- * @brief Virtual destructor.
- **/
- virtual ~QueryManagerBase() {}
-
- /**
- * @brief Get the query handle.
- **/
- const QueryHandle* query_handle() const {
- return query_handle_;
- }
-
- /**
- * @brief Get the QueryExecutionState for this query.
- **/
- inline const QueryExecutionState& getQueryExecutionState() const {
- return *query_exec_state_;
- }
-
- /**
- * @brief Process the received WorkOrder complete message.
- *
- * @param op_index The index of the specified operator node in the query DAG
- * for the completed WorkOrder.
- **/
- void processWorkOrderCompleteMessage(const dag_node_index op_index);
-
- /**
- * @brief Process the received RebuildWorkOrder complete message.
- *
- * @param op_index The index of the specified operator node in the query DAG
- * for the completed RebuildWorkOrder.
- **/
- void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
-
- /**
- * @brief Process the received data pipeline message.
- *
- * @param op_index The index of the specified operator node in the query DAG
- * for the pipelining block.
- * @param block The block id.
- * @param rel_id The ID of the relation that produced 'block'.
- **/
- void processDataPipelineMessage(const dag_node_index op_index,
- const block_id block,
- const relation_id rel_id);
-
- /**
- * @brief Fetch all work orders currently available in relational operator and
- * store them internally.
- *
- * @param index The index of the relational operator to be processed in the
- * query plan DAG.
- *
- * @return Whether any work order was generated by op.
- **/
- virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0;
-
- /**
- * @brief Process the received work order feedback message and notify
- * relational operator.
- *
- * @param op_index The index of the specified operator node in the query DAG
- * for the feedback message.
- * @param message Feedback message from work order.
- **/
- void processFeedbackMessage(const dag_node_index op_index,
- const WorkOrder::FeedbackMessage &message);
-
- /**
- * @brief Get the query status after processing an incoming message.
- *
- * @param op_index The index of the specified operator node in the query DAG
- * for the incoming message.
- *
- * @return QueryStatusCode as determined after the message is processed.
- **/
- QueryStatusCode queryStatus(const dag_node_index op_index);
-
- protected:
- /**
- * @brief Process a current relational operator: Get its workorders and store
- * them in the WorkOrdersContainer for this query. If the operator can
- * be marked as done, do so.
- *
- * @param index The index of the relational operator to be processed in the
- * query plan DAG.
- * @param recursively_check_dependents If an operator is done, should we
- * call processOperator on its dependents recursively.
- **/
- void processOperator(const dag_node_index index,
- const bool recursively_check_dependents);
-
- /**
- * @brief This function does the following things:
- * 1. Mark the given relational operator as "done".
- * 2. For all the dependents of this operator, check if all of their
- * blocking dependencies are met. If so inform them that the blocking
- * dependencies are met.
- * 3. Check if the given operator is done producing output. If it's
- * done, inform the dependents that they won't receive input anymore
- * from the given operator.
- *
- * @param index The index of the given relational operator in the DAG.
- **/
- void markOperatorFinished(const dag_node_index index);
-
- /**
- * @brief Check if all the dependencies of the node at specified index have
- * finished their execution.
- *
- * @note This function's true return value is a pre-requisite for calling
- * getRebuildWorkOrders()
- *
- * @param node_index The index of the specified node in the query DAG.
- *
- * @return True if all the dependencies have finished their execution. False
- * otherwise.
- **/
- inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
- for (const dag_node_index dependency_index :
- query_dag_->getDependencies(node_index)) {
- // If at least one of the dependencies is not met, return false.
- if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * @brief Check if all the blocking dependencies of the node at specified
- * index have finished their execution.
- *
- * @note A blocking dependency is the one which is pipeline breaker. Output of
- * a dependency can't be streamed to its dependent if the link between
- * them is pipeline breaker.
- *
- * @param node_index The index of the specified node in the query DAG.
- *
- * @return True if all the blocking dependencies have finished their
- * execution. False otherwise.
- **/
- inline bool checkAllBlockingDependenciesMet(
- const dag_node_index node_index) const {
- for (const dag_node_index blocking_dependency_index :
- blocking_dependencies_[node_index]) {
- if (!query_exec_state_->hasExecutionFinished(
- blocking_dependency_index)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * @brief Check if the execution of the given operator is over.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the execution of the given operator is over, false
- * otherwise.
- **/
- inline bool checkOperatorExecutionOver(const dag_node_index index) const {
- return this->checkNormalExecutionOver(index) &&
- (!checkRebuildRequired(index) || this->checkRebuildOver(index));
- }
-
- /**
- * @brief Check if the rebuild operation is required for a given operator.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the rebuild operation is required, false otherwise.
- **/
- inline bool checkRebuildRequired(const dag_node_index index) const {
- return query_exec_state_->isRebuildRequired(index);
- }
-
- /**
- * @brief Check if the rebuild operation for a given operator has been
- * initiated.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the rebuild operation has been initiated, false otherwise.
- **/
- inline bool checkRebuildInitiated(const dag_node_index index) const {
- return query_exec_state_->hasRebuildInitiated(index);
- }
-
- const QueryHandle *query_handle_;
-
- const std::size_t query_id_;
-
- DAG<RelationalOperator, bool> *query_dag_; // Owned by 'query_handle_'.
- const dag_node_index num_operators_in_dag_;
-
- // For all nodes, store their receiving dependents.
- std::vector<std::vector<dag_node_index>> output_consumers_;
-
- // For all nodes, store their pipeline breaking dependencies (if any).
- std::vector<std::vector<dag_node_index>> blocking_dependencies_;
-
- std::unique_ptr<QueryExecutionState> query_exec_state_;
-
- private:
- /**
- * @brief Check if the given operator's normal execution is over.
- *
- * @note The conditions for a given operator's normal execution to get over:
- * 1. All of its normal (i.e. non rebuild) WorkOrders have finished
- * execution.
- * 2. The operator is done generating work orders.
- * 3. All of the dependencies of the given operator have been met.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the normal execution of the given operator is over, false
- * otherwise.
- **/
- virtual bool checkNormalExecutionOver(const dag_node_index index) const = 0;
-
- /**
- * @brief Initiate the rebuild process for partially filled blocks generated
- * during the execution of the given operator.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the rebuild is over immediately, i.e. the operator didn't
- * generate any rebuild WorkOrders, false otherwise.
- **/
- virtual bool initiateRebuild(const dag_node_index index) = 0;
-
- /**
- * @brief Check if the rebuild operation for a given operator is over.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the rebuild operation is over, false otherwise.
- **/
- virtual bool checkRebuildOver(const dag_node_index index) const = 0;
-
- DISALLOW_COPY_AND_ASSIGN(QueryManagerBase);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
deleted file mode 100644
index 7d45933..0000000
--- a/query_execution/QueryManagerDistributed.cpp
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/QueryManagerDistributed.hpp"
-
-#include <cstddef>
-#include <cstdlib>
-#include <memory>
-#include <utility>
-
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/ShiftbossDirectory.hpp"
-#include "query_execution/WorkOrderProtosContainer.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.pb.h"
-#include "utility/DAG.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-using std::free;
-using std::malloc;
-using std::move;
-using std::size_t;
-using std::unique_ptr;
-
-namespace quickstep {
-
-QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
- const ShiftbossDirectory *shiftboss_directory,
- const tmb::client_id foreman_client_id,
- tmb::MessageBus *bus)
- : QueryManagerBase(query_handle),
- shiftboss_directory_(shiftboss_directory),
- foreman_client_id_(foreman_client_id),
- bus_(bus),
- normal_workorder_protos_container_(
- new WorkOrderProtosContainer(num_operators_in_dag_)) {
- // Collect all the workorders from all the relational operators in the DAG.
- for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
- if (checkAllBlockingDependenciesMet(index)) {
- query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
- processOperator(index, false);
- }
- }
-}
-
-serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage(
- const dag_node_index start_operator_index) {
- // Default policy: Operator with lowest index first.
- size_t num_operators_checked = 0;
- for (dag_node_index index = start_operator_index;
- num_operators_checked < num_operators_in_dag_;
- index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
- if (query_exec_state_->hasExecutionFinished(index)) {
- continue;
- }
- unique_ptr<serialization::WorkOrder> work_order_proto(
- normal_workorder_protos_container_->getWorkOrderProto(index));
- if (work_order_proto != nullptr) {
- query_exec_state_->incrementNumQueuedWorkOrders(index);
-
- unique_ptr<serialization::WorkOrderMessage> message_proto(new serialization::WorkOrderMessage);
- message_proto->set_query_id(query_id_);
- message_proto->set_operator_index(index);
- message_proto->mutable_work_order()->MergeFrom(*work_order_proto);
-
- return message_proto.release();
- }
- }
- // No normal WorkOrder protos available right now.
- return nullptr;
-}
-
-bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) {
- bool generated_new_workorder_protos = false;
- if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
- // Do not fetch any work units until all blocking dependencies are met.
- // The releational operator is not aware of blocking dependencies for
- // uncorrelated scalar queries.
- if (!checkAllBlockingDependenciesMet(index)) {
- return false;
- }
- const size_t num_pending_workorder_protos_before =
- normal_workorder_protos_container_->getNumWorkOrderProtos(index);
- const bool done_generation =
- query_dag_->getNodePayloadMutable(index)
- ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
- if (done_generation) {
- query_exec_state_->setDoneGenerationWorkOrders(index);
- }
-
- // TODO(shoban): It would be a good check to see if operator is making
- // useful progress, i.e., the operator either generates work orders to
- // execute or still has pending work orders executing. However, this will not
- // work if Foreman polls operators without feeding data. This check can be
- // enabled, if Foreman is refactored to call getAllWorkOrders() only when
- // pending work orders are completed or new input blocks feed.
-
- generated_new_workorder_protos =
- (num_pending_workorder_protos_before <
- normal_workorder_protos_container_->getNumWorkOrderProtos(index));
- }
- return generated_new_workorder_protos;
-}
-
-void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
- const std::size_t num_rebuild_work_orders) {
- // TODO(zuyu): Multiple Shiftbosses support.
- query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
-
- if (num_rebuild_work_orders != 0u) {
- // Wait for the rebuild work orders to finish.
- return;
- }
-
- // No needs for rebuilds.
- markOperatorFinished(op_index);
-
- for (const std::pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
- }
- }
-}
-
-bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
- DCHECK(checkRebuildRequired(index));
- DCHECK(!checkRebuildInitiated(index));
-
- const RelationalOperator &op = query_dag_->getNodePayload(index);
- DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
-
- serialization::InitiateRebuildMessage proto;
- proto.set_query_id(query_id_);
- proto.set_operator_index(index);
- proto.set_insert_destination_index(op.getInsertDestinationID());
- proto.set_relation_id(op.getOutputRelationID());
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage tagged_msg(static_cast<const void *>(proto_bytes),
- proto_length,
- kInitiateRebuildMessage);
- free(proto_bytes);
-
- LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
- << "') to Shiftboss";
- // TODO(zuyu): Multiple workers support.
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- shiftboss_directory_->getClientId(0),
- move(tagged_msg));
-
- // The negative value indicates that the number of rebuild work orders is to be
- // determined.
- query_exec_state_->setRebuildStatus(index, -1, true);
-
- // Wait for Shiftbosses to report the number of rebuild work orders.
- return false;
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
deleted file mode 100644
index e609ab8..0000000
--- a/query_execution/QueryManagerDistributed.hpp
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
-
-#include <cstddef>
-#include <memory>
-
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/WorkOrderProtosContainer.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class QueryHandle;
-class ShiftbossDirectory;
-
-namespace serialization { class WorkOrderMessage; }
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A class that manages the execution of a query including generation
- * of new work orders, keeping track of the query exection state.
- **/
-class QueryManagerDistributed final : public QueryManagerBase {
- public:
- /**
- * @brief Constructor.
- *
- * @param query_handle The QueryHandle object for this query.
- * @param shiftboss_directory The ShiftbossDirectory to use.
- * @param foreman_client_id The TMB client ID of the foreman thread.
- * @param bus The TMB used for communication.
- **/
- QueryManagerDistributed(QueryHandle *query_handle,
- const ShiftbossDirectory *shiftboss_directory,
- const tmb::client_id foreman_client_id,
- tmb::MessageBus *bus);
-
- ~QueryManagerDistributed() override {}
-
- bool fetchNormalWorkOrders(const dag_node_index index) override;
-
- /**
- * @brief Process the initiate rebuild work order response message.
- *
- * @param op_index The index of the specified operator node in the query DAG
- * for initiating the rebuild work order.
- * @param num_rebuild_work_orders The number of the rebuild work orders
- * generated for the operator indexed by 'op_index'.
- **/
- void processInitiateRebuildResponseMessage(const dag_node_index op_index,
- const std::size_t num_rebuild_work_orders);
-
- /**
- * @brief Get the next normal workorder to be excuted, wrapped in a
- * WorkOrderMessage proto.
- *
- * @param start_operator_index Begin the search for the schedulable WorkOrder
- * with the operator at this index.
- *
- * @return A pointer to the WorkOrderMessage proto. If there is no WorkOrder
- * to be executed, return NULL.
- **/
- serialization::WorkOrderMessage* getNextWorkOrderMessage(
- const dag_node_index start_operator_index);
-
- private:
- bool checkNormalExecutionOver(const dag_node_index index) const override {
- return (checkAllDependenciesMet(index) &&
- !normal_workorder_protos_container_->hasWorkOrderProto(index) &&
- query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
- query_exec_state_->hasDoneGenerationWorkOrders(index));
- }
-
- bool initiateRebuild(const dag_node_index index) override;
-
- bool checkRebuildOver(const dag_node_index index) const override {
- return query_exec_state_->hasRebuildInitiated(index) &&
- (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
- }
-
- const ShiftbossDirectory *shiftboss_directory_;
-
- const tmb::client_id foreman_client_id_;
- tmb::MessageBus *bus_;
-
- std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
-
- DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
deleted file mode 100644
index 237796f..0000000
--- a/query_execution/QueryManagerSingleNode.cpp
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/QueryManagerSingleNode.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "relational_operators/RebuildWorkOrder.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/StorageBlock.hpp"
-#include "utility/DAG.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/id_typedefs.h"
-
-namespace quickstep {
-
-class WorkOrder;
-
-QueryManagerSingleNode::QueryManagerSingleNode(
- const tmb::client_id foreman_client_id,
- const std::size_t num_numa_nodes,
- QueryHandle *query_handle,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- tmb::MessageBus *bus)
- : QueryManagerBase(query_handle),
- foreman_client_id_(foreman_client_id),
- storage_manager_(DCHECK_NOTNULL(storage_manager)),
- bus_(DCHECK_NOTNULL(bus)),
- query_context_(new QueryContext(query_handle->getQueryContextProto(),
- *catalog_database,
- storage_manager_,
- foreman_client_id_,
- bus_)),
- workorders_container_(
- new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) {
- // Collect all the workorders from all the relational operators in the DAG.
- for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
- if (checkAllBlockingDependenciesMet(index)) {
- query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
- processOperator(index, false);
- }
- }
-}
-
-WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
- const dag_node_index start_operator_index, const numa_node_id numa_node) {
- // Default policy: Operator with lowest index first.
- WorkOrder *work_order = nullptr;
- size_t num_operators_checked = 0;
- for (dag_node_index index = start_operator_index;
- num_operators_checked < num_operators_in_dag_;
- index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
- if (query_exec_state_->hasExecutionFinished(index)) {
- continue;
- }
- if (numa_node != kAnyNUMANodeID) {
- // First try to get a normal WorkOrder from the specified NUMA node.
- work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
- if (work_order != nullptr) {
- // A WorkOrder found on the given NUMA node.
- query_exec_state_->incrementNumQueuedWorkOrders(index);
- return WorkerMessage::WorkOrderMessage(work_order, index);
- } else {
- // Normal workorder not found on this node. Look for a rebuild workorder
- // on this NUMA node.
- work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
- if (work_order != nullptr) {
- return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
- }
- }
- }
- // Either no workorder found on the given NUMA node, or numa_node is
- // 'kAnyNUMANodeID'.
- // Try to get a normal WorkOrder from other NUMA nodes.
- work_order = workorders_container_->getNormalWorkOrder(index);
- if (work_order != nullptr) {
- query_exec_state_->incrementNumQueuedWorkOrders(index);
- return WorkerMessage::WorkOrderMessage(work_order, index);
- } else {
- // Normal WorkOrder not found, look for a RebuildWorkOrder.
- work_order = workorders_container_->getRebuildWorkOrder(index);
- if (work_order != nullptr) {
- return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
- }
- }
- }
- // No WorkOrders available right now.
- return nullptr;
-}
-
-bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) {
- bool generated_new_workorders = false;
- if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
- // Do not fetch any work units until all blocking dependencies are met.
- // The releational operator is not aware of blocking dependencies for
- // uncorrelated scalar queries.
- if (!checkAllBlockingDependenciesMet(index)) {
- return false;
- }
- const size_t num_pending_workorders_before =
- workorders_container_->getNumNormalWorkOrders(index);
- const bool done_generation =
- query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
- query_context_.get(),
- storage_manager_,
- foreman_client_id_,
- bus_);
- if (done_generation) {
- query_exec_state_->setDoneGenerationWorkOrders(index);
- }
-
- // TODO(shoban): It would be a good check to see if operator is making
- // useful progress, i.e., the operator either generates work orders to
- // execute or still has pending work orders executing. However, this will not
- // work if Foreman polls operators without feeding data. This check can be
- // enabled, if Foreman is refactored to call getAllWorkOrders() only when
- // pending work orders are completed or new input blocks feed.
-
- generated_new_workorders =
- (num_pending_workorders_before <
- workorders_container_->getNumNormalWorkOrders(index));
- }
- return generated_new_workorders;
-}
-
-bool QueryManagerSingleNode::initiateRebuild(const dag_node_index index) {
- DCHECK(!workorders_container_->hasRebuildWorkOrder(index));
- DCHECK(checkRebuildRequired(index));
- DCHECK(!checkRebuildInitiated(index));
-
- getRebuildWorkOrders(index, workorders_container_.get());
-
- query_exec_state_->setRebuildStatus(
- index, workorders_container_->getNumRebuildWorkOrders(index), true);
-
- return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
-}
-
-void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
- WorkOrdersContainer *container) {
- const RelationalOperator &op = query_dag_->getNodePayload(index);
- const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
-
- if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
- return;
- }
-
- std::vector<MutableBlockReference> partially_filled_block_refs;
-
- DCHECK(query_context_ != nullptr);
- InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
- DCHECK(insert_destination != nullptr);
-
- insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
-
- for (std::vector<MutableBlockReference>::size_type i = 0;
- i < partially_filled_block_refs.size();
- ++i) {
- container->addRebuildWorkOrder(
- new RebuildWorkOrder(query_id_,
- std::move(partially_filled_block_refs[i]),
- index,
- op.getOutputRelationID(),
- foreman_client_id_,
- bus_),
- index);
- }
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
deleted file mode 100644
index dd044a5..0000000
--- a/query_execution/QueryManagerSingleNode.hpp
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
-
-#include <cstddef>
-#include <memory>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryManagerBase.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-
-namespace tmb { class MessageBus; }
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class StorageManager;
-class WorkerMessage;
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A class that manages the execution of a query including generation
- * of new work orders, keeping track of the query exection state.
- **/
-class QueryManagerSingleNode final : public QueryManagerBase {
- public:
- /**
- * @brief Constructor.
- *
- * @param foreman_client_id The TMB client ID of the foreman thread.
- * @param num_numa_nodes The number of NUMA nodes used by the system.
- * @param query_handle The QueryHandle object for this query.
- * @param catalog_database The CatalogDatabse used by the query.
- * @param storage_manager The StorageManager used by the query.
- * @param bus The TMB used for communication.
- **/
- QueryManagerSingleNode(const tmb::client_id foreman_client_id,
- const std::size_t num_numa_nodes,
- QueryHandle *query_handle,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- tmb::MessageBus *bus);
-
- ~QueryManagerSingleNode() override {}
-
- bool fetchNormalWorkOrders(const dag_node_index index) override;
-
- /**
- * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
- *
- * @param start_operator_index Begin the search for the schedulable WorkOrder
- * with the operator at this index.
- * @param numa_node The next WorkOrder should preferably have its input(s)
- * from this numa_node. This is a hint and not a binding requirement.
- *
- * @return A pointer to the WorkerMessage. If there's no WorkOrder to be
- * executed, return NULL.
- **/
- WorkerMessage* getNextWorkerMessage(
- const dag_node_index start_operator_index,
- const numa_node_id node_id = kAnyNUMANodeID);
-
- /**
- * @brief Get a pointer to the QueryContext.
- **/
- inline QueryContext* getQueryContextMutable() {
- return query_context_.get();
- }
-
- private:
- bool checkNormalExecutionOver(const dag_node_index index) const override {
- return (checkAllDependenciesMet(index) &&
- !workorders_container_->hasNormalWorkOrder(index) &&
- query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
- query_exec_state_->hasDoneGenerationWorkOrders(index));
- }
-
- bool initiateRebuild(const dag_node_index index) override;
-
- bool checkRebuildOver(const dag_node_index index) const override {
- return query_exec_state_->hasRebuildInitiated(index) &&
- !workorders_container_->hasRebuildWorkOrder(index) &&
- (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
- }
-
- /**
- * @brief Get the rebuild WorkOrders for an operator.
- *
- * @note This function should be called only once, when all the normal
- * WorkOrders generated by an operator finish their execution.
- *
- * @param index The index of the operator in the query plan DAG.
- * @param container A pointer to a WorkOrdersContainer to be used to store the
- * generated WorkOrders.
- **/
- void getRebuildWorkOrders(const dag_node_index index,
- WorkOrdersContainer *container);
-
- const tmb::client_id foreman_client_id_;
-
- StorageManager *storage_manager_;
- tmb::MessageBus *bus_;
-
- std::unique_ptr<QueryContext> query_context_;
-
- std::unique_ptr<WorkOrdersContainer> workorders_container_;
-
- DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/README.md
----------------------------------------------------------------------
diff --git a/query_execution/README.md b/query_execution/README.md
deleted file mode 100644
index 12e0f57..0000000
--- a/query_execution/README.md
+++ /dev/null
@@ -1,149 +0,0 @@
-# An Overview of Quickstep's Execution Engine
-
-## Types of threads
-There are two kinds of threads in Quickstep - Foreman and Worker. The foreman
-thread controls the query execution progress, finds schedulable work (called as
-WorkOrder) and assigns (or schedules) it for execution to the Worker threads.
-The Worker threads receive the WorkOrders and execute them. After execution they
-send a completion message (or response message) back to Foreman.
-
-## High level functionality of Foreman
-Foreman requests all the RelationalOperators in the physical query plan
-represented as a DAG to give any schedulable work (in the form of WorkOrders).
-While doing so, Foreman has to respect dependencies between operators. There are
-two kinds of dependencies between operators - pipeline breaking (or blocking)
-and pipeline non-breaking (or non-blocking). In the first case, the output of
-the producer operator can't be pipelined to the consumer operator. In the second
-case, the Foreman will facilitate the pipelining of the intermediate output
-produced by the producer operator to the consumer operator.
-
-## Messages in execution engine
-
-### WorkerMessage
-There are multiple types of WorkerMessage, each of which indicates the purpose
-of the message.
-
-Foreman -> Worker : WorkerMessage which consists of the following things
-- A pointer to the WorkOrder to be executed. The WorkOrder could be a normal
-WorkOrder or a rebuild WorkOrder. A normal WorkOrder involves the invocation of
-WorkOrder::execute() method which is overriden by all of the RelationalOperator
-classes. A rebuild WorkOrder has one StorageBlock as input and calls a
-rebuild() method on the block. More details about rebuild() can be found in the
-storage module.
-- The index of the relational operator in the query plan DAG that produced the
-WorkOrder.
-
-### ForemanMessage
-Multiple senders are possible for this message. There are multiple types of
-ForemanMessages, each of which indicates the purpose of the message.
-
-Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and
-RebuildCompletion are sent after a Worker finishes executing a respective type
-of WorkOrder. This message helps the Foreman track the progress of individual
-operators as well as the whole query.
-
-Some relational operators and InsertDestination -> Foreman : ForemanMessage of
-types DataPipeline and WorkOrdersAvailable. InsertDestination first determines
-when an output block of a relational operator gets full. Once a block is full,
-it streams the unique block ID of the filled block along with the index of the
-relational operator that produced the block to Foreman with the message type
-DataPipeline. Some operators which modify the block in place also send similar
-messages to Foreman.
-
-### FeedbackMessage
-This message is sent from Workers to the Foreman during a WorkOrder execution.
-
-In certain operators, e.g. TextScan (used for bulk loading data from text files)
-and Sort, there is a communication between the relational operator and its
-WorkOrders. In such cases, when a WorkOrder is under execution on a Worker
-thread, a FeedbackMessage is sent from the WorkOrder via the Worker to Foreman.
-Foreman relays this message to the relational operator that produced the sender
-WorkOrder. The relational operator uses this message to update its internal
-state to potentially generate newer WorkOrders.
-
-### PoisonMessage
-This message is used to terminate a thread (i.e., Foreman and Worker), typically
-when shutting down the Quickstep process.
-
-## How does the Foreman react after receiving various messages?
-### WorkOrder completion message
-* Update the book-keeping of pending WorkOrders per Worker and per operator.
-* Fetch new WorkOrders if available for the operator of whose WorkOrder was
-just executed.
-* Update the state of an operator - the possible options are:
- - Normal WorkOrders are still under execution
- - All normal WorkOrders have finished execution and rebuild WorkOrders are yet
- to be generated.
- - All normal WorkOrders have finished execution, rebuild WorkOrders have been
- generated and issued to Workers.
- - All normal and rebuild WorkOrders have been executed AND all the dependency
- operators for the given operator have finished execution, therefore the given
- operator has finished its execution.
-* Fetch the WorkOrders from the dependents of the given operator.
-
-### Rebuild WorkOrder completion message
-* Update the book-keeping of pending WorkOrders per Worker and per operator.
-* If all the rebuild WorkOrders have finished their execution, try to fetch the
-WorkOrders of the dependent operators of the operator whose rebuild WorkOrder
-was just executed.
-
-### Data pipeline message
-* Find the consumer operators (i.e. operators which have a non
-pipeline-breaking link) of the producer operator.
-* Stream the block ID to the eligible consumer operators.
-* Fetch new WorkOrders from these consumer operators which may have become
-available because of the streaming of data.
-
-### WorkOrder available message
-* Fetch new WorkOrders that may have become available.
-
-### Feedback message
-* Relay the feedback message to a specified relational operator. The recipient
-operator is specified in the header of the message.
-
-## Example
-We look at a sample query to better describe the flow of messages -
-
-SELECT R.a, S.b from R, S where R.a = S.a and R.c < 20;
-
-This is an equi-join query which can be implemented using a hash join. We assume
-that S is a larger relation and the build relation is the output of the
-selection on R.
-
-The query execution plan involves the following operators:
-* SelectOperator to filter R based on predicate R.c < 20 (We call the output as
-R')
-* BuildHashOperator to construct a hash table on R'
-* HashJoinOperator to probe the hash table, where the probe relation is S
-* DestroyHashTableOperator to destroy the hash table after the join is done
-* Multiple DropTableOperators to destroy the temporaray relations produced as
-output.
-
-R has two blocks with IDs as 1 and 2. S has two blocks with IDs as 3 and 4.
-We assume that the SelectOperator produces one filled block and one partially
-filled block as output. Note that in the query plan DAG, the link between
-SelectOperator and BuildHashOperator allows streaming of data. The
-HashJoinOperator's WorkOrder can't be generated unless all of the
-BuildHashOperator's WorkOrders have finished their execution. The execution is
-assumed to be performed by a single Worker thread.
-
-The following table describes the message exchange that happens during the
-query excution. We primarily focus on three operators - Select, BuildHash and
-HashJoin (probe).
-
-| Sender | Receiver | Message | Message Description |
-|:-----------------:|----------|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | SelectWorkOrder on block 1. |
-| InsertDestination | Foreman | ForemanMessage of type kDataPipeline | SelectWorkOrder on block 1 produced one fully filled block as output. The output block ID as pipelined from the InsertDestination to Foreman. Foreman relays this block ID to BuildHashOperator, which generates a WorkOrder which is ready to be scheduled. |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | SelectWorkOrder on block 1 completed. |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | SelectWorkOrder on block 2. |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | SelectWorkOrder on block 2 completed. As a result of this execution, a partially filled block of output was produced. |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | BuildHashWorkOrder on the fully filled block of R' |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | BuildHashWorkOrder execution complete. |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | BuildHashWorkOrder on the partially filled block of R' |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | BuildHashWorkOrder execution complete. |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | HashJoinWorkOrder for block 3 from S |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | HashJoinWorkOrder execution complete. |
-| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | HashJoinWorkOrder for block 4 from S |
-| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | HashJoinWorkOrder execution complete. |
-
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
deleted file mode 100644
index ed4bade..0000000
--- a/query_execution/Shiftboss.cpp
+++ /dev/null
@@ -1,400 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_execution/Shiftboss.hpp"
-
-#include <cstddef>
-#include <cstdlib>
-#include <memory>
-#include <string>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "relational_operators/RebuildWorkOrder.hpp"
-#include "relational_operators/WorkOrderFactory.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageManager.hpp"
-#include "threading/ThreadUtil.hpp"
-
-#include "glog/logging.h"
-
-#include "tmb/address.h"
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/message_style.h"
-#include "tmb/tagged_message.h"
-
-using std::free;
-using std::malloc;
-using std::move;
-using std::size_t;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::AnnotatedMessage;
-using tmb::MessageBus;
-using tmb::TaggedMessage;
-
-namespace quickstep {
-
-class WorkOrder;
-
-void Shiftboss::run() {
- if (cpu_id_ >= 0) {
- // We can pin the shiftboss thread to a CPU if specified.
- ThreadUtil::BindToCPU(cpu_id_);
- }
-
- processShiftbossRegistrationResponseMessage();
-
- for (;;) {
- // Receive() is a blocking call, causing this thread to sleep until next
- // message is received.
- AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') received the typed '" << annotated_message.tagged_message.message_type()
- << "' message from client " << annotated_message.sender;
- switch (annotated_message.tagged_message.message_type()) {
- case kQueryInitiateMessage: {
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::QueryInitiateMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
- break;
- }
- case kWorkOrderMessage: {
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::WorkOrderMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- const std::size_t query_id = proto.query_id();
- DCHECK_EQ(1u, query_contexts_.count(query_id));
-
- WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
- shiftboss_index_,
- &database_cache_,
- query_contexts_[query_id].get(),
- storage_manager_,
- shiftboss_client_id_,
- bus_);
-
- unique_ptr<WorkerMessage> worker_message(
- WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
-
- TaggedMessage worker_tagged_message(worker_message.get(),
- sizeof(*worker_message),
- kWorkOrderMessage);
-
- const size_t worker_index = getSchedulableWorker();
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
- << "') from Foreman to worker " << worker_index;
-
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- workers_->getClientID(worker_index),
- move(worker_tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- break;
- }
- case kInitiateRebuildMessage: {
- // Construct rebuild work orders, and send back their number to
- // 'ForemanDistributed'.
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::InitiateRebuildMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processInitiateRebuildMessage(proto.query_id(),
- proto.operator_index(),
- proto.insert_destination_index(),
- proto.relation_id());
- break;
- }
- case kCatalogRelationNewBlockMessage: // Fall through.
- case kDataPipelineMessage:
- case kWorkOrderFeedbackMessage:
- case kWorkOrderCompleteMessage:
- case kRebuildWorkOrderCompleteMessage: {
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') forwarded typed '" << annotated_message.tagged_message.message_type()
- << "' message from Worker with TMB client ID '" << annotated_message.sender
- << "' to Foreman with TMB client ID " << foreman_client_id_;
-
- DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(annotated_message.tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- break;
- }
- case kQueryTeardownMessage: {
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::QueryTeardownMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- query_contexts_.erase(proto.query_id());
- break;
- }
- case kSaveQueryResultMessage: {
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::SaveQueryResultMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- for (int i = 0; i < proto.blocks_size(); ++i) {
- storage_manager_->saveBlockOrBlob(proto.blocks(i));
- }
-
- // Clean up query execution states, i.e., QueryContext.
- query_contexts_.erase(proto.query_id());
-
- serialization::SaveQueryResultResponseMessage proto_response;
- proto_response.set_query_id(proto.query_id());
- proto_response.set_relation_id(proto.relation_id());
- proto_response.set_cli_id(proto.cli_id());
- proto_response.set_shiftboss_index(shiftboss_index_);
-
- const size_t proto_response_length = proto_response.ByteSize();
- char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
- CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
-
- TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
- proto_response_length,
- kSaveQueryResultResponseMessage);
- free(proto_response_bytes);
-
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
- << "') to Foreman with TMB client ID " << foreman_client_id_;
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- break;
- }
- case kPoisonMessage: {
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') forwarded PoisonMessage (typed '" << kPoisonMessage
- << "') from Foreman to all workers";
-
- tmb::MessageStyle broadcast_style;
- broadcast_style.Broadcast(true);
-
- const MessageBus::SendStatus send_status =
- bus_->Send(shiftboss_client_id_,
- worker_addresses_,
- broadcast_style,
- move(annotated_message.tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- return;
- }
- default: {
- LOG(FATAL) << "Unknown TMB message type";
- }
- }
- }
-}
-
-size_t Shiftboss::getSchedulableWorker() {
- const size_t num_workers = workers_->getNumWorkers();
-
- size_t curr_worker = start_worker_index_;
- for (;;) {
- if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) {
- start_worker_index_ = (curr_worker + 1) % num_workers;
- // TODO(zuyu): workers_->incrementNumQueuedWorkOrders(curr_worker);
- // But we need a WorkOrder queue first.
- return curr_worker;
- }
-
- curr_worker = (curr_worker + 1) % num_workers;
- }
-}
-
-void Shiftboss::registerWithForeman() {
- tmb::Address all_addresses;
- all_addresses.All(true);
-
- tmb::MessageStyle style;
-
- serialization::ShiftbossRegistrationMessage proto;
- proto.set_work_order_capacity(getWorkOrderCapacity());
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kShiftbossRegistrationMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
- << "') to all";
- tmb::MessageBus::SendStatus send_status =
- bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
- DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-}
-
-void Shiftboss::processShiftbossRegistrationResponseMessage() {
- AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
- DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
-
- foreman_client_id_ = annotated_message.sender;
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') received the typed '" << kShiftbossRegistrationResponseMessage
- << "' message from ForemanDistributed with client " << foreman_client_id_;
-
- serialization::ShiftbossRegistrationResponseMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- shiftboss_index_ = proto.shiftboss_index();
-
- // Forward this message to Workers regarding <shiftboss_index_>.
- QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
- worker_addresses_,
- move(annotated_message.tagged_message),
- bus_);
-}
-
-void Shiftboss::processQueryInitiateMessage(
- const std::size_t query_id,
- const serialization::CatalogDatabase &catalog_database_cache_proto,
- const serialization::QueryContext &query_context_proto) {
- database_cache_.update(catalog_database_cache_proto);
-
- auto query_context = std::make_unique<QueryContext>(
- query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_);
- query_contexts_.emplace(query_id, move(query_context));
-
- serialization::QueryInitiateResponseMessage proto;
- proto.set_query_id(query_id);
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message_response(static_cast<const void*>(proto_bytes),
- proto_length,
- kQueryInitiateResponseMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
- << "') to Foreman with TMB client ID " << foreman_client_id_;
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
- CHECK(send_status == MessageBus::SendStatus::kOK);
-}
-
-void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
- const std::size_t op_index,
- const QueryContext::insert_destination_id dest_index,
- const relation_id rel_id) {
- DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
-
- DCHECK_EQ(1u, query_contexts_.count(query_id));
- InsertDestination *insert_destination = query_contexts_[query_id]->getInsertDestination(dest_index);
- DCHECK(insert_destination != nullptr);
-
- vector<MutableBlockReference> partially_filled_block_refs;
- insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
-
- serialization::InitiateRebuildResponseMessage proto;
- proto.set_query_id(query_id);
- proto.set_operator_index(op_index);
- proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
- proto.set_shiftboss_index(shiftboss_index_);
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message_response(static_cast<const void*>(proto_bytes),
- proto_length,
- kInitiateRebuildResponseMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
- << "') to Foreman with TMB client ID " << foreman_client_id_;
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
- CHECK(send_status == MessageBus::SendStatus::kOK);
-
- for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
- // NOTE(zuyu): Worker releases the memory after the execution of
- // RebuildWorkOrder on the Worker.
- WorkOrder *rebuild_work_order =
- new RebuildWorkOrder(query_id,
- move(partially_filled_block_refs[i]),
- op_index,
- rel_id,
- shiftboss_client_id_,
- bus_);
-
- unique_ptr<WorkerMessage> worker_message(
- WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index));
-
- TaggedMessage worker_tagged_message(worker_message.get(),
- sizeof(*worker_message),
- kRebuildWorkOrderMessage);
-
- const size_t worker_index = getSchedulableWorker();
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
- << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
- << "') to worker " << worker_index;
-
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- workers_->getClientID(worker_index),
- move(worker_tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- }
-}
-
-} // namespace quickstep