You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/06/16 04:20:09 UTC
[10/20] incubator-quickstep git commit: Long lived Foreman thread
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
new file mode 100644
index 0000000..2145429
--- /dev/null
+++ b/query_execution/PolicyEnforcer.cpp
@@ -0,0 +1,183 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcer.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManager.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "relational_operators/WorkOrder.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+ " can be allocated in a single round of dispatch of messages to"
+ " the workers.");
+
+bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+ if (admitted_queries_.size() < kMaxConcurrentQueries) {
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+ // Query with the same ID not present, ok to admit.
+ admitted_queries_[query_id].reset(
+ new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+ catalog_database_, storage_manager_, bus_));
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+ } else {
+ // This query will have to wait.
+ waiting_queries_.push(query_handle);
+ return false;
+ }
+}
+
+void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
+ // TODO(harshad) : Provide processXMessage() public functions in
+ // QueryManager, so that we need to extract message from the
+ // TaggedMessage only once.
+ std::size_t query_id;
+ switch (tagged_message.message_type()) {
+ case kWorkOrderCompleteMessage: // Fall through.
+ case kRebuildWorkOrderCompleteMessage: {
+ serialization::WorkOrderCompletionMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: {
+ serialization::CatalogRelationNewBlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kDataPipelineMessage: {
+ serialization::DataPipelineMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kWorkOrdersAvailableMessage: {
+ serialization::WorkOrdersAvailableMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kWorkOrderFeedbackMessage: {
+ WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), tagged_message.message_bytes());
+ query_id = msg.header().query_id;
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
+ }
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ const QueryManager::QueryStatusCode return_code =
+ admitted_queries_[query_id]->processMessage(tagged_message);
+ if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
+ removeQuery(query_id);
+ if (!waiting_queries_.empty()) {
+ // Admit the earliest waiting query.
+ QueryHandle *new_query = waiting_queries_.front();
+ waiting_queries_.pop();
+ admitQuery(new_query);
+ }
+ }
+}
+
+void PolicyEnforcer::getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+ // Iterate over admitted queries until either there are no more
+ // messages available, or the maximum number of messages have
+ // been collected.
+ DCHECK(worker_messages->empty());
+ // TODO(harshad) - Make this function generic enough so that it
+ // works well when multiple queries are getting executed.
+ std::size_t per_query_share = 0;
+ if (!admitted_queries_.empty()) {
+ per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+ } else {
+ LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ return;
+ }
+ DCHECK_GT(per_query_share, 0u);
+ std::vector<std::size_t> finished_queries_ids;
+
+ for (const auto &admitted_query_info : admitted_queries_) {
+ QueryManager *curr_query_manager = admitted_query_info.second.get();
+ DCHECK(curr_query_manager != nullptr);
+ std::size_t messages_collected_curr_query = 0;
+ while (messages_collected_curr_query < per_query_share) {
+ WorkerMessage *next_worker_message =
+ curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+ if (next_worker_message != nullptr) {
+ ++messages_collected_curr_query;
+ worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+ } else {
+ // No more work ordes from the current query at this time.
+ // Check if the query's execution is over.
+ if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ // If the query has been executed, remove it.
+ finished_queries_ids.push_back(admitted_query_info.first);
+ }
+ break;
+ }
+ }
+ }
+ for (const std::size_t finished_qid : finished_queries_ids) {
+ removeQuery(finished_qid);
+ }
+}
+
+void PolicyEnforcer::removeQuery(const std::size_t query_id) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+ LOG(WARNING) << "Removing query with ID " << query_id
+ << " that hasn't finished its execution";
+ }
+ admitted_queries_.erase(query_id);
+}
+
+bool PolicyEnforcer::admitQueries(
+ const std::vector<QueryHandle*> &query_handles) {
+ for (QueryHandle *curr_query : query_handles) {
+ if (!admitQuery(curr_query)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
new file mode 100644
index 0000000..5915b79
--- /dev/null
+++ b/query_execution/PolicyEnforcer.hpp
@@ -0,0 +1,167 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ * in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcer {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param foreman_client_id The TMB client ID of the Foreman.
+ * @param num_numa_nodes Number of NUMA nodes used by the system.
+ * @param catalog_database The CatalogDatabase used.
+ * @param storage_manager The StorageManager used.
+ * @param bus The TMB.
+ **/
+ PolicyEnforcer(const tmb::client_id foreman_client_id,
+ const std::size_t num_numa_nodes,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ tmb::MessageBus *bus)
+ : foreman_client_id_(foreman_client_id),
+ num_numa_nodes_(num_numa_nodes),
+ catalog_database_(catalog_database),
+ storage_manager_(storage_manager),
+ bus_(bus) {}
+
+ /**
+ * @brief Destructor.
+ **/
+ ~PolicyEnforcer() {
+ if (hasQueries()) {
+ LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
+ "waiting queries";
+ }
+ }
+
+ /**
+ * @brief Admit a query to the system.
+ *
+ * @param query_handle The QueryHandle for the new query.
+ *
+ * @return Whether the query was admitted to the system.
+ **/
+ bool admitQuery(QueryHandle *query_handle);
+
+ /**
+ * @brief Admit multiple queries in the system.
+ *
+ * @note In the current simple implementation, we only allow one active
+ * query in the system. Other queries will have to wait.
+ *
+ * @param query_handles A vector of QueryHandles for the queries to be
+ * admitted.
+ *
+ * @return True if all the queries were admitted, false if at least one query
+ * was not admitted.
+ **/
+ bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+ /**
+ * @brief Remove a given query that is under execution.
+ *
+ * @note This function is made public so that it is possible for a query
+ * to be killed. Otherwise, it should only be used privately by the
+ * class.
+ *
+ * TODO(harshad) - Extend this function to support removal of waiting queries.
+ *
+ * @param query_id The ID of the query to be removed.
+ **/
+ void removeQuery(const std::size_t query_id);
+
+ /**
+ * @brief Get worker messages to be dispatched. These worker messages come
+ * from the active queries.
+ *
+ * @param worker_messages The worker messages to be dispatched.
+ **/
+ void getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+ /**
+ * @brief Process a message sent to the Foreman, which gets passed on to the
+ * policy enforcer.
+ *
+ * @param message The message.
+ **/
+ void processMessage(const TaggedMessage &tagged_message);
+
+ /**
+ * @brief Check if there are any queries to be executed.
+ *
+ * @return True if there is at least one active or waiting query, false if
+ * the policy enforcer doesn't have any query.
+ **/
+ inline bool hasQueries() const {
+ return !(admitted_queries_.empty() && waiting_queries_.empty());
+ }
+
+ private:
+ static constexpr std::size_t kMaxConcurrentQueries = 1;
+
+ const tmb::client_id foreman_client_id_;
+ const std::size_t num_numa_nodes_;
+
+ CatalogDatabaseLite *catalog_database_;
+ StorageManager *storage_manager_;
+
+ tmb::MessageBus *bus_;
+
+ // Key = query ID, value = QueryManager* for the key query.
+ std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
+
+ // The queries which haven't been admitted yet.
+ std::queue<QueryHandle*> waiting_queries_;
+
+ DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 3bfce17..54dd557 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -89,13 +89,13 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
for (int i = 0; i < proto.insert_destinations_size(); ++i) {
const serialization::InsertDestination &insert_destination_proto = proto.insert_destinations(i);
- insert_destinations_.emplace_back(
- InsertDestination::ReconstructFromProto(insert_destination_proto,
- database.getRelationSchemaById(
- insert_destination_proto.relation_id()),
- storage_manager,
- scheduler_client_id,
- bus));
+ insert_destinations_.emplace_back(InsertDestination::ReconstructFromProto(
+ proto.query_id(),
+ insert_destination_proto,
+ database.getRelationSchemaById(insert_destination_proto.relation_id()),
+ storage_manager,
+ scheduler_client_id,
+ bus));
}
for (int i = 0; i < proto.predicates_size(); ++i) {
@@ -231,7 +231,7 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
}
}
- return true;
+ return proto.IsInitialized();
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index b37286c..98cd0b6 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -54,4 +54,6 @@ message QueryContext {
// NOTE(zuyu): For UpdateWorkOrder only.
repeated UpdateGroup update_groups = 10;
+
+ required uint64 query_id = 11;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 15803cf..9d9a9e5 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -27,6 +27,7 @@ message EmptyMessage {
message WorkOrderCompletionMessage {
required uint64 operator_index = 1;
required uint64 worker_thread_index = 2;
+ required uint64 query_id = 3;
}
message CatalogRelationNewBlockMessage {
@@ -35,16 +36,19 @@ message CatalogRelationNewBlockMessage {
// Used by PartitionAwareInsertDestination.
optional uint64 partition_id = 3;
+ required uint64 query_id = 4;
}
message DataPipelineMessage {
required uint64 operator_index = 1;
required fixed64 block_id = 2;
required int32 relation_id = 3;
+ required uint64 query_id = 4;
}
message WorkOrdersAvailableMessage {
required uint64 operator_index = 1;
+ required uint64 query_id = 2;
}
// BlockLocator related messages.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fc253bc..9d1060f 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -58,6 +58,8 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
// We sort the following message types in the order of a life cycle of a query.
enum QueryExecutionMessageType : message_type_id {
+ kAdmitRequestMessage, // Requesting a query (or queries) to be admitted, from
+ // the main thread to Foreman.
kWorkOrderMessage, // From Foreman to Worker.
kWorkOrderCompleteMessage, // From Worker to Foreman.
kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman.
@@ -67,7 +69,8 @@ enum QueryExecutionMessageType : message_type_id {
// their corresponding RelationalOperators.
kRebuildWorkOrderMessage, // From Foreman to Worker.
kRebuildWorkOrderCompleteMessage, // From Worker to Foreman.
- kPoisonMessage, // From the CLI shell to Foreman, then from Foreman to Workers.
+ kWorkloadCompletionMessage, // From Foreman to main thread.
+ kPoisonMessage, // From the main thread to Foreman and Workers.
#ifdef QUICKSTEP_DISTRIBUTED
// BlockLocator related messages, sorted in a life cycle of StorageManager
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index a8b6a38..78fd159 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -17,9 +17,12 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
+#include <memory>
#include <utility>
+#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkerMessage.hpp"
#include "utility/Macros.hpp"
#include "tmb/address.h"
@@ -60,6 +63,55 @@ class QueryExecutionUtil {
std::move(tagged_message));
}
+ /**
+ * @brief Construct and send an AdmitRequestMessage from a given sender to a
+ * given recipient.
+ *
+ * @param sender_id The TMB client ID of the sender.
+ * @param receiver_id The TMB client ID of the receiver.
+ * @param query_handle The QueryHandle used in the AdmitRequestMessage.
+ * @param bus A pointer to the TMB.
+ * @param tagged_message A moved from reference to the tagged message.
+ *
+ * @return A status code indicating the result of the message delivery.
+ * The caller should ensure that the status is SendStatus::kOK.
+ **/
+ static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
+ const tmb::client_id sender_id,
+ const tmb::client_id receiver_id,
+ QueryHandle *query_handle,
+ MessageBus *bus) {
+ std::unique_ptr<AdmitRequestMessage> request_message(
+ new AdmitRequestMessage(query_handle));
+ const std::size_t size_of_request_msg = sizeof(*request_message);
+ TaggedMessage admit_tagged_message(
+ request_message.release(), size_of_request_msg, kAdmitRequestMessage);
+
+ return QueryExecutionUtil::SendTMBMessage(
+ bus, sender_id, receiver_id, std::move(admit_tagged_message));
+ }
+
+ static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
+ // Terminate all threads.
+ // The sender thread broadcasts poison message to the workers and foreman.
+ // Each worker dies after receiving poison message. The order of workers'
+ // death is irrelavant.
+ MessageStyle style;
+ style.Broadcast(true);
+ Address address;
+ address.All(true);
+ std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
+ TaggedMessage poison_tagged_message(poison_message.get(),
+ sizeof(*poison_message),
+ kPoisonMessage);
+
+ const tmb::MessageBus::SendStatus send_status = bus->Send(
+ sender_id, address, style, std::move(poison_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
+ "Broadcast poison message from sender with TMB client ID " << sender_id
+ << " failed";
+ }
+
private:
/**
* @brief Constructor. Made private to avoid instantiation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/QueryManager.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.hpp b/query_execution/QueryManager.hpp
index 47f54c5..b52460f 100644
--- a/query_execution/QueryManager.hpp
+++ b/query_execution/QueryManager.hpp
@@ -25,18 +25,21 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/DAG.hpp"
#include "utility/Macros.hpp"
+#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
namespace quickstep {
class CatalogDatabaseLite;
-class ForemanMessage;
class QueryHandle;
class StorageManager;
class WorkerMessage;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
index eb9aedd..d023daa 100644
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@ -28,6 +28,8 @@
#include "utility/Macros.hpp"
#include "utility/PtrVector.hpp"
+#include "glog/logging.h"
+
namespace quickstep {
/** \addtogroup QueryExecution
@@ -76,7 +78,7 @@ class WorkOrdersContainer {
* @return If there are pending WorkOrders.
**/
inline bool hasNormalWorkOrder(const std::size_t operator_index) const {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK_LT(operator_index, num_operators_);
return normal_workorders_[operator_index].hasWorkOrder();
}
@@ -92,9 +94,9 @@ class WorkOrdersContainer {
**/
inline bool hasNormalWorkOrderForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_LT(operator_index, num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return normal_workorders_[operator_index].hasWorkOrderForNUMANode(
numa_node_id);
}
@@ -108,7 +110,7 @@ class WorkOrdersContainer {
* @return If there are pending rebuild WorkOrders.
**/
inline bool hasRebuildWorkOrder(const std::size_t operator_index) const {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK_LT(operator_index, num_operators_);
return rebuild_workorders_[operator_index].hasWorkOrder();
}
@@ -124,9 +126,9 @@ class WorkOrdersContainer {
**/
inline bool hasRebuildWorkOrderForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_LT(operator_index, num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return rebuild_workorders_[operator_index].hasWorkOrderForNUMANode(
numa_node_id);
}
@@ -144,9 +146,9 @@ class WorkOrdersContainer {
**/
WorkOrder* getNormalWorkOrderForNUMANode(const std::size_t operator_index,
const int numa_node_id) {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_LT(operator_index, num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return normal_workorders_[operator_index].getWorkOrderForNUMANode(
numa_node_id);
}
@@ -164,7 +166,7 @@ class WorkOrdersContainer {
**/
WorkOrder* getNormalWorkOrder(const std::size_t operator_index,
const bool prefer_single_NUMA_node = true) {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK_LT(operator_index, num_operators_);
return normal_workorders_[operator_index].getWorkOrder(
prefer_single_NUMA_node);
}
@@ -182,9 +184,9 @@ class WorkOrdersContainer {
**/
WorkOrder* getRebuildWorkOrderForNUMANode(const std::size_t operator_index,
const int numa_node_id) {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_LT(operator_index, num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return rebuild_workorders_[operator_index].getWorkOrderForNUMANode(
numa_node_id);
}
@@ -202,7 +204,7 @@ class WorkOrdersContainer {
**/
WorkOrder* getRebuildWorkOrder(const std::size_t operator_index,
const bool prefer_single_NUMA_node = true) {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK_LT(operator_index, num_operators_);
return rebuild_workorders_[operator_index].getWorkOrder(
prefer_single_NUMA_node);
}
@@ -220,8 +222,8 @@ class WorkOrdersContainer {
* @param operator_index The index of the operator in the query DAG.
**/
void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
- DEBUG_ASSERT(workorder != nullptr);
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(workorder != nullptr);
+ DCHECK_LT(operator_index, num_operators_);
normal_workorders_[operator_index].addWorkOrder(workorder);
}
@@ -238,8 +240,8 @@ class WorkOrdersContainer {
**/
void addRebuildWorkOrder(WorkOrder *workorder,
const std::size_t operator_index) {
- DEBUG_ASSERT(workorder != nullptr);
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(workorder != nullptr);
+ DCHECK_LT(operator_index, num_operators_);
rebuild_workorders_[operator_index].addWorkOrder(workorder);
}
@@ -254,9 +256,9 @@ class WorkOrdersContainer {
**/
inline std::size_t getNumNormalWorkOrdersForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_LT(operator_index, num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return normal_workorders_[operator_index].getNumWorkOrdersForNUMANode(
numa_node_id);
}
@@ -271,7 +273,7 @@ class WorkOrdersContainer {
**/
inline std::size_t getNumNormalWorkOrders(
const std::size_t operator_index) const {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK_LT(operator_index, num_operators_);
return normal_workorders_[operator_index].getNumWorkOrders();
}
@@ -286,9 +288,9 @@ class WorkOrdersContainer {
**/
inline std::size_t getNumRebuildWorkOrdersForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_LT(operator_index, num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return rebuild_workorders_[operator_index].getNumWorkOrdersForNUMANode(
numa_node_id);
}
@@ -303,7 +305,7 @@ class WorkOrdersContainer {
**/
inline std::size_t getNumRebuildWorkOrders(
const std::size_t operator_index) const {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK_LT(operator_index, num_operators_);
return rebuild_workorders_[operator_index].getNumWorkOrders();
}
@@ -418,8 +420,8 @@ class WorkOrdersContainer {
void addWorkOrder(WorkOrder *workorder);
bool hasWorkOrderForNUMANode(const int numa_node_id) const {
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return single_numa_node_workorders_[numa_node_id].hasWorkOrder() ||
multiple_numa_nodes_workorders_.hasWorkOrderForNUMANode(
numa_node_id);
@@ -440,8 +442,8 @@ class WorkOrdersContainer {
std::size_t getNumWorkOrdersForNUMANode(
const int numa_node_id) const {
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return single_numa_node_workorders_[numa_node_id].getNumWorkOrders() +
multiple_numa_nodes_workorders_.getNumWorkOrdersForNUMANode(
numa_node_id);
@@ -463,8 +465,8 @@ class WorkOrdersContainer {
}
WorkOrder* getWorkOrderForNUMANode(const int numa_node_id) {
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
WorkOrder *work_order = single_numa_node_workorders_[numa_node_id].getWorkOrder();
if (work_order == nullptr) {
work_order = multiple_numa_nodes_workorders_.getWorkOrderForNUMANode(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 645fd05..ef596e1 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -58,11 +58,14 @@ void Worker::run() {
WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
DCHECK(message.getWorkOrder() != nullptr);
message.getWorkOrder()->execute();
+ const std::size_t query_id_for_workorder =
+ message.getWorkOrder()->getQueryID();
delete message.getWorkOrder();
- sendWorkOrderCompleteMessage(annotated_msg.sender,
- message.getRelationalOpIndex(),
- tagged_message.message_type() == kRebuildWorkOrderMessage);
+ sendWorkOrderCompleteMessage(
+ annotated_msg.sender, message.getRelationalOpIndex(),
+ query_id_for_workorder,
+ tagged_message.message_type() == kRebuildWorkOrderMessage);
break;
}
case kPoisonMessage: {
@@ -76,10 +79,12 @@ void Worker::run() {
void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
const size_t op_index,
+ const size_t query_id,
const bool is_rebuild_work_order) {
serialization::WorkOrderCompletionMessage proto;
proto.set_operator_index(op_index);
proto.set_worker_thread_index(worker_thread_index_);
+ proto.set_query_id(query_id);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t proto_length = proto.ByteSize();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index b94e937..c0bafdc 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -97,11 +97,13 @@ class Worker : public Thread {
*
* @param receiver The id of the TMB client which should receive the response.
* @param op_index The index of the operator to which the WorkOrder belongs.
+ * @param query_id The ID of the query which the WorkOrder belongs to.
* @param is_rebuild_work_order True if it is a RebuildWorkOrder. Otherwise
* false.
**/
void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
const std::size_t op_index,
+ const std::size_t query_id,
const bool is_rebuild_work_order);
const std::size_t worker_thread_index_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/WorkerMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp
index ec63af9..560c1ba 100644
--- a/query_execution/WorkerMessage.hpp
+++ b/query_execution/WorkerMessage.hpp
@@ -30,6 +30,8 @@ class WorkOrder;
**/
class WorkerMessage {
public:
+ static constexpr int kInvalidRecipientIndexHint = -1;
+
enum class WorkerMessageType {
kRebuildWorkOrder = 0,
kWorkOrder,
@@ -105,6 +107,23 @@ class WorkerMessage {
return type_;
}
+ /**
+ * @brief Set a hint for the recipient worker thread.
+ *
+ * @param recipient_index_hint The hint i.e. the worker thread index.
+ **/
+ inline void setRecipientHint(const int recipient_index_hint) {
+ recipient_index_hint_ = recipient_index_hint;
+ }
+
+ /**
+ * @brief Get the hint for the recipient worker thread. The hint is invalid if
+ * it is kInvalidRecipientIndexHint.
+ **/
+ inline int getRecipientHint() const {
+ return recipient_index_hint_;
+ }
+
private:
/**
* @brief Constructor.
@@ -120,12 +139,13 @@ class WorkerMessage {
const WorkerMessageType type)
: work_unit_(work_unit),
relational_op_index_(relational_op_index),
- type_(type) {
- }
+ type_(type),
+ recipient_index_hint_(kInvalidRecipientIndexHint) {}
WorkOrder *work_unit_;
const std::size_t relational_op_index_;
const WorkerMessageType type_;
+ int recipient_index_hint_;
};
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/tests/Foreman_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Foreman_unittest.cpp b/query_execution/tests/Foreman_unittest.cpp
deleted file mode 100644
index cbe5088..0000000
--- a/query_execution/tests/Foreman_unittest.cpp
+++ /dev/null
@@ -1,952 +0,0 @@
-/**
- * Copyright 2011-2015 Quickstep Technologies LLC.
- * Copyright 2015-2016 Pivotal Software, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#include <climits>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/Foreman.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryPlan.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/InsertDestination.pb.h"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "utility/DAG.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-#include "gtest/gtest.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::client_id;
-
-namespace quickstep {
-
-class WorkOrderProtosContainer;
-
-class MockWorkOrder : public WorkOrder {
- public:
- explicit MockWorkOrder(const int op_index)
- : WorkOrder(0), op_index_(op_index) {}
-
- void execute() override {
- VLOG(3) << "WorkOrder[" << op_index_ << "] executing.";
- }
-
- inline QueryPlan::DAGNodeIndex getOpIndex() const {
- return op_index_;
- }
-
- private:
- const QueryPlan::DAGNodeIndex op_index_;
-
- DISALLOW_COPY_AND_ASSIGN(MockWorkOrder);
-};
-
-class MockOperator: public RelationalOperator {
- public:
- enum function_name {
- kFeedInputBlock = 0,
- kFeedInputBlocks,
- kDoneFeedingInputBlocks,
- kGetAllWorkOrders
- };
-
- MockOperator(const bool produce_workorders,
- const bool has_streaming_input,
- const int max_getworkorder_iters = 1,
- const int max_workorders = INT_MAX)
- : RelationalOperator(0 /* Query Id */),
- produce_workorders_(produce_workorders),
- has_streaming_input_(has_streaming_input),
- max_workorders_(max_workorders),
- max_getworkorder_iters_(max_getworkorder_iters),
- num_calls_get_workorders_(0),
- num_workorders_generated_(0),
- num_calls_feedblock_(0),
- num_calls_feedblocks_(0),
- num_calls_donefeedingblocks_(0) {
- }
-
-#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": "
-
- // The methods below are used to check whether Foreman calls the Relational
- // operator, how many times it calls a particular method etc.
- inline int getNumWorkOrders() const {
- return num_workorders_generated_;
- }
-
- inline int getNumCalls(const function_name fname) const {
- switch (fname) {
- case kFeedInputBlock:
- return num_calls_feedblock_;
- case kFeedInputBlocks:
- return num_calls_feedblocks_;
- case kDoneFeedingInputBlocks:
- return num_calls_donefeedingblocks_;
- case kGetAllWorkOrders:
- return num_calls_get_workorders_;
- default:
- return -1;
- }
- }
-
- inline bool getBlockingDependenciesMet() const {
- MOCK_OP_LOG(3) << "met.";
- return blocking_dependencies_met_;
- }
-
- void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
- insert_destination_index_ = insert_destination_index;
- }
-
- // Mock to trigger doneFeedingInputBlocks for the dependent operators
- // in Foreman::markOperatorFinished.
- void setOutputRelationID(const relation_id rel_id) {
- output_relation_id_ = rel_id;
- }
-
- // Override methods from the base class.
- bool getAllWorkOrders(
- WorkOrdersContainer *container,
- QueryContext *query_context,
- StorageManager *storage_manager,
- const tmb::client_id foreman_client_id,
- tmb::MessageBus *bus) override {
- ++num_calls_get_workorders_;
- if (produce_workorders_) {
- if (has_streaming_input_) {
- if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) {
- MOCK_OP_LOG(3) << "[stream] generate WorkOrder";
- container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
- ++num_workorders_generated_;
- }
- } else {
- if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
- MOCK_OP_LOG(3) << "[static] generate WorkOrder";
- container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
- ++num_workorders_generated_;
- }
- }
- }
- MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") "
- << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")";
- return num_calls_get_workorders_ == max_getworkorder_iters_;
- }
-
- bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override {
- return true;
- }
-
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
- ++num_calls_feedblock_;
- MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
- }
-
- void feedInputBlocks(const relation_id rel_id,
- std::vector<block_id> *partially_filled_blocks) override {
- ++num_calls_feedblocks_;
- MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")";
- }
-
- void doneFeedingInputBlocks(const relation_id rel_id) override {
- ++num_calls_donefeedingblocks_;
- MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")";
- }
-
- QueryContext::insert_destination_id getInsertDestinationID() const override {
- return insert_destination_index_;
- }
-
- const relation_id getOutputRelationID() const override {
- return output_relation_id_;
- }
-
- private:
- const bool produce_workorders_;
- const bool has_streaming_input_;
- const int max_workorders_;
- const int max_getworkorder_iters_;
-
- int num_calls_get_workorders_;
- int num_workorders_generated_;
- int num_calls_feedblock_;
- int num_calls_feedblocks_;
- int num_calls_donefeedingblocks_;
-
- QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId;
-
- relation_id output_relation_id_ = -1;
-
-#undef MOCK_OP_LOG
-
- DISALLOW_COPY_AND_ASSIGN(MockOperator);
-};
-
-
-class ForemanTest : public ::testing::Test {
- protected:
- // Class ForemanTest is the friend of class Foreman. Each TEST_F behaves
- // as a separate class, so we can't access Foreman's private members in
- // TEST_F.
- virtual void SetUp() {
- db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
- storage_manager_.reset(new StorageManager("./"));
-
- query_plan_.reset(new QueryPlan());
-
- bus_.Initialize();
-
- foreman_.reset(new Foreman(&bus_, db_.get(), storage_manager_.get()));
-
- // This thread acts both as Foreman as well as Worker. Foreman connects to
- // the bus in its constructor.
- worker_client_id_ = bus_.Connect();
-
- // Register as sender and receiver for relevant types of messages.
- bus_.RegisterClientAsSender(worker_client_id_, kWorkOrderCompleteMessage);
- bus_.RegisterClientAsSender(worker_client_id_, kRebuildWorkOrderCompleteMessage);
- bus_.RegisterClientAsSender(worker_client_id_, kDataPipelineMessage);
- bus_.RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);
- bus_.RegisterClientAsReceiver(worker_client_id_, kRebuildWorkOrderMessage);
- bus_.RegisterClientAsReceiver(worker_client_id_, kPoisonMessage);
-
- std::vector<client_id> worker_client_ids;
- worker_client_ids.push_back(worker_client_id_);
-
- std::vector<numa_node_id> numa_nodes;
- numa_nodes.push_back(static_cast<numa_node_id>(-1));
-
- workers_.reset(new WorkerDirectory(1, worker_client_ids, numa_nodes));
- foreman_->setWorkerDirectory(workers_.get());
- }
-
- inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
- return foreman_->query_exec_state_->getNumQueuedWorkOrders(index);
- }
-
- inline const int getNumOperatorsFinished() const {
- return foreman_->query_exec_state_->getNumOperatorsFinished();
- }
-
- inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const {
- return foreman_->query_exec_state_->hasExecutionFinished(index);
- }
-
- inline bool popWorkOrderIfAvailable(MockWorkOrder **workorder) {
- AnnotatedMessage msg;
- if (bus_.ReceiveIfAvailable(worker_client_id_, &msg)) {
- WorkerMessage message(*static_cast<const WorkerMessage*>(msg.tagged_message.message()));
- *workorder = static_cast<MockWorkOrder*>(message.getWorkOrder());
- return true;
- }
- return false;
- }
-
- inline bool popRebuildWorkOrderIfAvailable(MockWorkOrder **workorder) {
- return popWorkOrderIfAvailable(workorder);
- }
-
- inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
- VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
- foreman_->processDataPipelineMessage(source_operator_index, 0 /* block_id */, 0 /* relation_id */);
- return foreman_->query_exec_state_->hasQueryExecutionFinished();
- }
-
- inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
- VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
- foreman_->processWorkOrderCompleteMessage(index, 0 /* worker id */);
- return foreman_->query_exec_state_->hasQueryExecutionFinished();
- }
-
- inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
- VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
- foreman_->processRebuildWorkOrderCompleteMessage(index, 0 /* worker id */);
- return foreman_->query_exec_state_->hasQueryExecutionFinished();
- }
-
- inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
- VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
- foreman_->processDataPipelineMessage(index,
- BlockIdUtil::GetBlockId(1 /* domain */, 1),
- 0 /* relation_id */);
- return foreman_->query_exec_state_->hasQueryExecutionFinished();
- }
-
- inline bool startForeman() {
- foreman_->initialize();
- return foreman_->query_exec_state_->hasQueryExecutionFinished();
- }
-
- inline int getWorkerInputQueueSize() {
- return bus_.CountQueuedMessagesForClient(worker_client_id_);
- }
-
- unique_ptr<CatalogDatabase> db_;
- unique_ptr<StorageManager> storage_manager_;
-
- unique_ptr<QueryPlan> query_plan_;
-
- unique_ptr<Foreman> foreman_;
- MessageBusImpl bus_;
-
- client_id worker_client_id_;
-
- unique_ptr<WorkerDirectory> workers_;
-};
-
-TEST_F(ForemanTest, SingleNodeDAGNoWorkOrdersTest) {
- // This test creates a DAG of a single node. No workorders are generated.
- query_plan_->addRelationalOperator(new MockOperator(false, false));
- foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
- const MockOperator &op = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(0));
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- // Foreman exits after initialize, since no workorders are generated.
- EXPECT_TRUE(startForeman());
-
- // op doesn't have any dependencies.
- EXPECT_TRUE(op.getBlockingDependenciesMet());
-
- // No workorder is generated. No response is received.
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- // We expect one call for op's getAllWorkOrders().
- EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(ForemanTest, SingleNodeDAGStaticWorkOrdersTest) {
- // This test creates a DAG of a single node. Static workorders are generated.
- const QueryPlan::DAGNodeIndex id = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
- foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
- const MockOperator &op = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id));
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
- EXPECT_FALSE(startForeman());
-
- // op doesn't have any dependencies.
- EXPECT_TRUE(op.getBlockingDependenciesMet());
-
- // We expect one call for op's getAllWorkOrders().
- EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-
- // One workorder is generated.
- EXPECT_EQ(1, getWorkerInputQueueSize());
- EXPECT_EQ(1, op.getNumWorkOrders());
-
- // Worker receives a WorkOrder.
- MockWorkOrder *work_order;
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- EXPECT_EQ(id, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
- EXPECT_EQ(0, getNumOperatorsFinished());
-
- // Send a message to Foreman upon workorder completion.
- // Last event processed by Foreman.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
- EXPECT_EQ(1, getNumOperatorsFinished());
- EXPECT_TRUE(getOperatorFinishedStatus(id));
-}
-
-TEST_F(ForemanTest, SingleNodeDAGDynamicWorkOrdersTest) {
- // This test creates a DAG of a single node. WorkOrders are generated
- // dynamically as pending work orders complete execution, i.e.,
- // getAllWorkOrders() is called multiple times. getAllWorkOrders() will be
- // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
- // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will insert no
- // WorkOrder and return true.
-
- // TODO(shoban): This test can not be more robust than this because of fixed
- // scaffolding of mocking. If we use gMock, we can do much better.
- const QueryPlan::DAGNodeIndex id = query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
- foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
- const MockOperator &op = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id));
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
- EXPECT_FALSE(startForeman());
-
- // op doesn't have any dependencies.
- EXPECT_TRUE(op.getBlockingDependenciesMet());
-
- for (int i = 0; i < 3; i++) {
- // We expect one call for op's getAllWorkOrders().
- EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-
- // One workorder is generated.
- EXPECT_EQ(1, getWorkerInputQueueSize());
- EXPECT_EQ(i + 1, op.getNumWorkOrders());
-
- // Worker receives a WorkOrder.
- MockWorkOrder *work_order;
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- EXPECT_EQ(id, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
- EXPECT_EQ(0, getNumOperatorsFinished());
-
- if (i < 2) {
- // Send a message to Foreman upon workorder completion.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
- } else {
- // Send a message to Foreman upon workorder completion.
- // Last event.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
- }
- }
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
-
- EXPECT_EQ(1, getNumOperatorsFinished());
- EXPECT_TRUE(getOperatorFinishedStatus(id));
-
- // We place this check in the end, since it's true throughout the test.
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(ForemanTest, TwoNodesDAGBlockingLinkTest) {
- // We use two nodes in the DAG with a blocking link between them.
- // There is no streaming of data involved in this test.
- const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false));
- const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, false));
-
- // Create a blocking link.
- query_plan_->addDirectDependency(id2, id1, true);
-
- static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
- ->setOutputRelationID(0xdead);
-
- const MockOperator &op1 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id1));
- const MockOperator &op2 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
- foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
- // Make sure queues are empty initially.
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- EXPECT_FALSE(startForeman());
-
- // op1 doesn't have any dependencies
- EXPECT_TRUE(op1.getBlockingDependenciesMet());
-
- // Only op1 should receive a call to getAllWorkOrders initially.
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
- // Only op1 should produce a workorder.
- EXPECT_EQ(1, getWorkerInputQueueSize());
- EXPECT_EQ(1, op1.getNumWorkOrders());
- EXPECT_EQ(0, op2.getNumWorkOrders());
-
- // Worker receives a WorkOrder.
- MockWorkOrder *work_order;
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // This workorder's source should be op1.
- EXPECT_EQ(id1, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
- // Foreman hasn't yet got workorder completion response for the workorder.
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id1));
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
- EXPECT_EQ(0, getNumOperatorsFinished());
-
- // Send a message to Foreman upon workorder (generated by op1) completion.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
- // op1 is over now, op2 still to go.
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
- EXPECT_EQ(1, getNumOperatorsFinished());
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
- // op1 is op2's blocking dependency.
- EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- // op2 should get first call of getAllWorkOrders() when op1 is over.
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
- EXPECT_EQ(1, op2.getNumWorkOrders());
-
- // Send a message to Foreman upon workorder (generated by op2) completion.
-
- // Note that the worker hasn't yet popped the workorder. Usually this won't
- // happen as workers pop workorders first, execute and then send the response.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
- // WorkOrder yet to be popped by the worker.
- EXPECT_EQ(1, getWorkerInputQueueSize());
-
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
- EXPECT_EQ(2, getNumOperatorsFinished());
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // The workorder should have come from op2.
- EXPECT_EQ(id2, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- // Expect no additional calls to getAllWorkOrders.
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
-}
-
-TEST_F(ForemanTest, TwoNodesDAGPipeLinkTest) {
- // We use two nodes in the DAG with a non-blocking link between them.
- // We stream output of op1 to op2. Sequeuce of events is as follows:
- // 1. op1 creates a workorder.
- // 2. We send a "block full" (from op1) to Foreman.
- // 3. op2 creates a workorder because of step 2.
- const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
- const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
-
- // Create a non-blocking link.
- query_plan_->addDirectDependency(id2, id1, false);
-
- static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
- ->setOutputRelationID(0xdead);
-
- const MockOperator &op1 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id1));
- const MockOperator &op2 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
- foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
- // Make sure queues are empty initially.
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- startForeman();
-
- // As none of the operators have a blocking link, blocking dependencies should
- // be met.
- EXPECT_TRUE(op1.getBlockingDependenciesMet());
- EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op1.getNumWorkOrders());
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- // op2 will generate workorder only after receiving a streaming input.
- EXPECT_EQ(0, op2.getNumWorkOrders());
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
- // There should be one workorder sent to the worker so far.
- EXPECT_EQ(1, getWorkerInputQueueSize());
-
- // Worker receives a WorkOrder.
- MockWorkOrder *work_order;
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // This workorder's source be op1.
- EXPECT_EQ(id1, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- // Send a message to Foreman upon block getting full (output of op1).
- EXPECT_FALSE(placeOutputBlockMessage(id1));
-
- // op1 is not finished yet because the response of workorder completion hasn't
- // been received yet by the Foreman.
- EXPECT_FALSE(getOperatorFinishedStatus(id1));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
- // No additional call to op1's getAllWorkOrders.
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
- // Output from op1 should be fed to op2.
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
- // A call to op2's getAllWorkOrders because of the streamed input.
- EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op2.getNumWorkOrders());
-
- // Place a message of a workorder completion of op1 on Foreman's input queue.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
- // An additional call to op2's getAllWorkOrders because of completion of op1.
- EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(2, op2.getNumWorkOrders());
- EXPECT_EQ(1, getWorkerInputQueueSize());
-
- // Pop a workorder from Foreman's output queue.
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // The workorder should have been generated by op2.
- EXPECT_EQ(id2, work_order->getOpIndex());
- work_order->execute();
- delete work_order;
-
- // Place a message of a workorder completion of op2 on Foreman's input queue.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
- EXPECT_EQ(1, getWorkerInputQueueSize());
-
- // Pop a workorder from Foreman's output queue.
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // The workorder should have been generated by op2.
- EXPECT_EQ(id2, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- // Send a message to Foreman upon workorder (generated by op2) completion.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
- EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
-}
-
-TEST_F(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest) {
- // In this test, we create a 2-node DAG with a non-blocking link between them.
- // There is no streaming of data from op1 to op2 during the execution of op1.
- // op1 produces a partially filled block at the end of its execution which is
- // rebuilt and then fed to op2.
- const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
- const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
-
- // Create a non-blocking link.
- query_plan_->addDirectDependency(id2, id1, false);
-
- // Create a relation, owned by db_.
- CatalogRelation *relation = new CatalogRelation(nullptr /* catalog_database */, "test_relation");
- const relation_id output_relation_id = db_->addRelation(relation);
-
- // Setup the InsertDestination proto in the query context proto.
- serialization::QueryContext query_context_proto;
-
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto.insert_destinations_size();
- serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
-
- insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
- insert_destination_proto->set_relation_id(output_relation_id);
- insert_destination_proto->set_relational_op_index(id1);
-
- MockOperator *op1_mutable =
- static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1));
- op1_mutable->setInsertDestinationID(insert_destination_index);
- op1_mutable->setOutputRelationID(output_relation_id);
-
- const MockOperator &op1 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id1));
- const MockOperator &op2 = static_cast<const MockOperator&>(query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
- foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
- foreman_->reconstructQueryContextFromProto(query_context_proto);
-
- // NOTE(zuyu): An operator generally has no ideas about partially filled
- // blocks, but InsertDestination in QueryContext does.
- // Mock to add partially filled blocks in the InsertDestination.
- InsertDestination *insert_destination =
- foreman_->query_context_->getInsertDestination(insert_destination_index);
- DCHECK(insert_destination != nullptr);
- MutableBlockReference block_ref;
- static_cast<BlockPoolInsertDestination*>(insert_destination)->available_block_refs_.push_back(move(block_ref));
-
- // Make sure queues are empty initially.
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- startForeman();
-
- // There's no blocking dependency in the DAG.
- EXPECT_TRUE(op1.getBlockingDependenciesMet());
- EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op1.getNumWorkOrders());
-
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op2.getNumWorkOrders());
-
- // Worker receives a WorkOrder.
- MockWorkOrder *work_order;
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // The workorder should have been generated by op1.
- EXPECT_EQ(id1, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- // Send a message to Foreman upon workorder (generated by op1) completion.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-
- // op1 generates a rebuild workorder. The block is rebuilt and streamed
- // to Foreman.
- EXPECT_FALSE(placeDataPipelineMessage(id1));
-
- // Based on the streamed input, op2's getAllWorkOrders should produce a
- // workorder.
- EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op2.getNumWorkOrders());
-
- // Worker receives a rebuild WorkOrder.
- MockWorkOrder *rebuild_op1;
- ASSERT_TRUE(popRebuildWorkOrderIfAvailable(&rebuild_op1));
- // We skip the check for relation ID of the rebuild WorkOrder, as the partially
- // filled block reference is a mock reference with garbage contents.
- delete rebuild_op1;
-
- EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
- EXPECT_EQ(1, getWorkerInputQueueSize());
-
- // Worker receives a WorkOrder.
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // The workorder should have been generated by op2.
- EXPECT_EQ(id2, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- // Send a message to Foreman upon workorder (generated by op2) completion.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
- EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
-}
-
-TEST_F(ForemanTest, MultipleNodesNoOutputTest) {
- // When an operator produces workorders but no output, the Foreman should
- // check the dependents of this operator to make progress.
- const QueryPlan::DAGNodeIndex kNumNodes = 5;
- std::vector<QueryPlan::DAGNodeIndex> ids;
- ids.reserve(kNumNodes);
-
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- if (i == 0) {
- ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
- } else {
- ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
- }
- VLOG(3) << ids[i];
- }
-
- /**
- * The DAG looks like this:
- *
- * op1 -> op2 -> op3 -> op4 -> op5
- *
- **/
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
- query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
- static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
- ->setOutputRelationID(0xdead);
- }
-
- std::vector<const MockOperator*> operators;
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
- }
-
- foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
-
- // Make sure queues are empty initially.
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- startForeman();
-
- // operators[0] should have produced a workorder by now.
- EXPECT_EQ(1, operators[0]->getNumWorkOrders());
- EXPECT_EQ(1, getWorkerInputQueueSize());
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
- EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
-
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
- }
-
- // Worker receives a WorkOrder.
- MockWorkOrder *work_order;
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // The workorder should have been generated by operators[0].
- EXPECT_EQ(ids[0], work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- // Send a message to Foreman upon workorder (generated by operators[0])
- // completion.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
-
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
- EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
- if (i < kNumNodes - 1) {
- EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
- }
- }
-}
-
-TEST_F(ForemanTest, OutOfOrderWorkOrderCompletionTest) {
- // Consider two operators, both generate one workorder each. The dependent's
- // workorder finishes before dependency's workorder.
- const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
- const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
-
- // Create a non-blocking link.
- query_plan_->addDirectDependency(id2, id1, false);
-
- foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
- // There should be two workorders on Worker's private queue, for this test.
- foreman_->setMaxMessagesPerWorker(2);
-
- // Make sure queues are empty initially.
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- startForeman();
-
- // Expect one workorder produced by op1.
- EXPECT_EQ(1, getWorkerInputQueueSize());
-
- // Pop a workorder from Foreman's output queue.
- MockWorkOrder *work_order;
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // This workorder's source be op1.
- EXPECT_EQ(id1, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- // Send a message to Foreman upon a block (output of op1) getting full.
- EXPECT_FALSE(placeOutputBlockMessage(id1));
-
- // op1 is not finished yet because the response of workorder completion hasn't
- // been received yet.
- EXPECT_FALSE(getOperatorFinishedStatus(id1));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
- // Expect one workorder produced by op2.
- EXPECT_EQ(1, getWorkerInputQueueSize());
-
- // Worker receives a WorkOrder.
- ASSERT_TRUE(popWorkOrderIfAvailable(&work_order));
- // This workorder's source should be op2.
- EXPECT_EQ(id2, work_order->getOpIndex());
-
- work_order->execute();
- delete work_order;
-
- // As mentioned earlier, op2 finishes before op1.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
-
- // op1's workorder execution is over.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id1));
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
- EXPECT_EQ(0, getWorkerInputQueueSize());
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
index 9ba5978..4f98748 100644
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ b/query_execution/tests/QueryManager_unittest.cpp
@@ -228,8 +228,9 @@ class QueryManagerTest : public ::testing::Test {
db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
storage_manager_.reset(new StorageManager("./"));
bus_.Initialize();
- query_handle_.reset(new QueryHandle(0));
+ query_handle_.reset(new QueryHandle(0)); // dummy query ID.
query_plan_ = query_handle_->getQueryPlanMutable();
+ query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
}
inline void constructQueryManager() {
@@ -256,6 +257,7 @@ class QueryManagerTest : public ::testing::Test {
proto.set_block_id(0); // dummy block ID
proto.set_relation_id(0); // dummy relation ID.
+ proto.set_query_id(0); // dummy query ID.
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
@@ -276,6 +278,7 @@ class QueryManagerTest : public ::testing::Test {
serialization::WorkOrderCompletionMessage proto;
proto.set_operator_index(index);
proto.set_worker_thread_index(1); // dummy worker ID.
+ proto.set_query_id(0); // dummy query ID.
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t proto_length = proto.ByteSize();
@@ -297,6 +300,7 @@ class QueryManagerTest : public ::testing::Test {
serialization::WorkOrderCompletionMessage proto;
proto.set_operator_index(index);
proto.set_worker_thread_index(1); // dummy worker thread ID.
+ proto.set_query_id(0); // dummy query ID.
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t proto_length = proto.ByteSize();
@@ -320,6 +324,7 @@ class QueryManagerTest : public ::testing::Test {
proto.set_block_id(0); // dummy block ID
proto.set_relation_id(0); // dummy relation ID.
+ proto.set_query_id(0); // dummy query ID.
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/tests/WorkOrdersContainer_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/WorkOrdersContainer_unittest.cpp b/query_execution/tests/WorkOrdersContainer_unittest.cpp
index cf133c4..cb583ab 100644
--- a/query_execution/tests/WorkOrdersContainer_unittest.cpp
+++ b/query_execution/tests/WorkOrdersContainer_unittest.cpp
@@ -72,6 +72,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) {
// they get inserted and retrieved correctly.
std::vector<int> numa_node_ids;
// A container for one operator and no NUMA nodes.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, 0);
EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
@@ -104,11 +105,15 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) {
ASSERT_TRUE(returned_work_order != nullptr);
EXPECT_EQ(work_order.getID(), static_cast<MockNUMAWorkOrder*>(returned_work_order)->getID());
+ EXPECT_EQ(query_id, returned_work_order->getQueryID());
+
WorkOrder *returned_rebuild_work_order = w.getRebuildWorkOrder(0);
ASSERT_TRUE(returned_rebuild_work_order != nullptr);
EXPECT_EQ(work_order1.getID(),
static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+ EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
+
// Container should be empty now.
EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0));
@@ -123,6 +128,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) {
// if they get inserted and retrieved correctly and the order of retrieval.
// A container for one operator and no NUMA nodes.
std::vector<int> numa_node_ids;
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, 0);
EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
@@ -164,6 +170,8 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) {
ASSERT_TRUE(returned_work_order != nullptr);
EXPECT_EQ(static_cast<int>(kNumWorkOrders + i),
static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+ EXPECT_EQ(query_id, returned_work_order->getQueryID());
+ EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
}
// Container should be empty now.
@@ -190,6 +198,7 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) {
const std::size_t kNUMANodesUsed = numa_node_ids.size();
// A container for one operator and kNUMANodes.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, kNUMANodes);
for (std::size_t i = 0; i < kNUMANodesUsed; ++i) {
@@ -246,6 +255,9 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) {
ASSERT_TRUE(returned_rebuild_work_order != nullptr);
EXPECT_EQ(rebuild_workorders[i].getID(),
static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+
+ EXPECT_EQ(query_id, returned_work_order->getQueryID());
+ EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
}
// No workorder should be left for this operator on any NUMA node.
@@ -291,6 +303,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
const std::size_t kNUMANodesUsed = numa_nodes.size();
// Create the container.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, kNUMANodes);
w.addNormalWorkOrder(&multiple_numa_work_order, 0);
@@ -331,6 +344,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
w.getNormalWorkOrderForNUMANode(0, numa_nodes[0]));
ASSERT_TRUE(observed_work_order != nullptr);
+ EXPECT_EQ(query_id, observed_work_order->getQueryID());
EXPECT_EQ(one_numa_work_order.getPreferredNUMANodes().front(),
observed_work_order->getPreferredNUMANodes().front());
EXPECT_EQ(one_numa_work_order.getID(), observed_work_order->getID());
@@ -348,6 +362,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
EXPECT_EQ(no_numa_work_order.getID(),
static_cast<MockNUMAWorkOrder *>(observed_non_numa_work_order)->getID());
+ EXPECT_EQ(query_id, observed_non_numa_work_order->getQueryID());
EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[0]));
EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[1]));
@@ -361,6 +376,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
ASSERT_TRUE(observed_work_order_multiple_numa_nodes != nullptr);
EXPECT_EQ(multiple_numa_work_order.getID(), observed_work_order_multiple_numa_nodes->getID());
+ EXPECT_EQ(query_id, observed_work_order_multiple_numa_nodes->getQueryID());
std::vector<int> observed_numa_nodes(
observed_work_order_multiple_numa_nodes->getPreferredNUMANodes());
// Look up the expected numa nodes in the observed_numa_nodes vector.
@@ -427,6 +443,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
const std::size_t kNUMANodes = numa_node_ids.size();
// Create the container.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(kNumOperators, kNUMANodes);
std::vector<std::size_t> operator_ids;
@@ -538,6 +555,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
curr_operator_id, single_numa_node_id));
ASSERT_TRUE(observed_work_order_single_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_single_numa =
normal_workorders_one_numa_ids[curr_operator_id];
@@ -550,6 +568,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
curr_operator_id, multiple_numa_node_id));
ASSERT_TRUE(observed_work_order_multiple_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_multiple_numa =
normal_workorders_multiple_numa_ids[curr_operator_id];
@@ -562,6 +581,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
static_cast<MockNUMAWorkOrder *>(w.getNormalWorkOrder(curr_operator_id));
ASSERT_TRUE(observed_work_order_no_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_no_numa =
normal_workorders_no_numa_ids[curr_operator_id];
@@ -620,6 +640,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
const std::size_t kNUMANodes = numa_node_ids.size();
// Create the container.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(kNumOperators, kNUMANodes);
std::vector<std::size_t> operator_ids;
@@ -732,6 +753,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
curr_operator_id, single_numa_node_id));
ASSERT_TRUE(observed_work_order_single_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_single_numa =
rebuild_workorders_one_numa_ids[curr_operator_id];
@@ -744,6 +766,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
curr_operator_id, multiple_numa_node_id));
ASSERT_TRUE(observed_work_order_multiple_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_multiple_numa =
rebuild_workorders_multiple_numa_ids[curr_operator_id];
@@ -755,6 +778,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
MockNUMAWorkOrder *observed_work_order_no_numa =
static_cast<MockNUMAWorkOrder *>(w.getRebuildWorkOrder(curr_operator_id));
+ EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_no_numa =
rebuild_workorders_no_numa_ids[curr_operator_id];
@@ -772,6 +796,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) {
numa_node_ids.push_back(0);
const std::size_t kNumWorkOrdersPerType = 100;
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, 2);
std::vector<int> single_numa_node_workorder_ids;
@@ -820,6 +845,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) {
MockNUMAWorkOrder *observed_work_order = static_cast<MockNUMAWorkOrder *>(
w.getNormalWorkOrder(0, prefer_single_NUMA_node));
ASSERT_TRUE(observed_work_order != nullptr);
+ EXPECT_EQ(query_id, observed_work_order->getQueryID());
if (prefer_single_NUMA_node) {
EXPECT_EQ(*single_numa_it, observed_work_order->getID());
EXPECT_EQ(1u, observed_work_order->getPreferredNUMANodes().size());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 0630bca..c7fd018 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -105,6 +105,7 @@ class ExecutionGenerator {
execution_plan_(DCHECK_NOTNULL(query_handle->getQueryPlanMutable())),
query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())),
execution_heuristics_(new ExecutionHeuristics()) {
+ query_context_proto_->set_query_id(query_handle_->query_id());
#ifdef QUICKSTEP_DISTRIBUTED
catalog_database_cache_proto_ = DCHECK_NOTNULL(query_handle->getCatalogDatabaseCacheProtoMutable());
#endif
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 6ef2a03..5b58f75 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -116,9 +116,11 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
quickstep_cli_PrintToScreen
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_Foreman
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_Worker
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 56b53ba..ea871d0 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -24,7 +24,9 @@
#include "cli/DropRelation.hpp"
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/Foreman.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/Worker.hpp"
#include "query_optimizer/ExecutionGenerator.hpp"
#include "query_optimizer/LogicalGenerator.hpp"
@@ -40,6 +42,8 @@
#include "glog/logging.h"
+#include "tmb/tagged_message.h"
+
namespace quickstep {
class CatalogRelation;
@@ -90,13 +94,18 @@ void ExecutionGeneratorTestRunner::runTestCase(
physical_generator.generatePlan(
logical_generator.generatePlan(*result.parsed_statement));
execution_generator.generatePlan(physical_plan);
- foreman_->setQueryPlan(
- query_handle.getQueryPlanMutable()->getQueryPlanDAGMutable());
-
- foreman_->reconstructQueryContextFromProto(query_handle.getQueryContextProto());
- foreman_->start();
- foreman_->join();
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ main_thread_client_id_,
+ foreman_->getBusClientID(),
+ &query_handle,
+ &bus_);
+
+ // Receive workload completion message from Foreman.
+ const AnnotatedMessage annotated_msg =
+ bus_.Receive(main_thread_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+ DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
if (query_result_relation) {