You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/09 19:26:38 UTC
[4/5] incubator-quickstep git commit: Long lived Foreman thread
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
new file mode 100644
index 0000000..1ee1df9
--- /dev/null
+++ b/query_execution/PolicyEnforcer.cpp
@@ -0,0 +1,177 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcer.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManager.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "relational_operators/WorkOrder.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+ if (admitted_queries_.size() < kMaxConcurrentQueries) {
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+ admitted_queries_[query_id].reset(
+ new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+ catalog_database_, storage_manager_, bus_));
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+ } else {
+ // This query will have to wait.
+ waiting_queries_.push(query_handle);
+ return false;
+ }
+}
+
+void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
+ // TODO(harshad) : Provide processXMessage() public functions in
+ // QueryManager, so that we need to extract message from the
+ // TaggedMessage only once.
+ std::size_t query_id;
+ switch (tagged_message.message_type()) {
+ case kWorkOrderCompleteMessage: // Fall through.
+ case kRebuildWorkOrderCompleteMessage: {
+ serialization::WorkOrderCompletionMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: {
+ serialization::CatalogRelationNewBlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kDataPipelineMessage: {
+ serialization::DataPipelineMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kWorkOrdersAvailableMessage: {
+ serialization::WorkOrdersAvailableMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kWorkOrderFeedbackMessage: {
+ // TODO(harshad) Add query ID to FeedbackMessage.
+ WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), tagged_message.message_bytes());
+ query_id = msg.header().query_id;
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
+ }
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ const QueryManager::QueryStatusCode return_code =
+ admitted_queries_[query_id]->processMessage(tagged_message);
+ if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
+ removeQuery(query_id);
+ if (!waiting_queries_.empty()) {
+ // Admit the earliest waiting query.
+ QueryHandle *new_query = waiting_queries_.front();
+ waiting_queries_.pop();
+ admitQuery(new_query);
+ }
+ }
+}
+
+void PolicyEnforcer::getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+ // Iterate over admitted queries until either there are no more
+ // messages available, or the maximum number of messages have
+ // been collected.
+ DCHECK(worker_messages->empty());
+ // TODO(harshad) - Make this function generic enough so that it
+ // works well when multiple queries are getting executed.
+ std::size_t per_query_share = 0;
+ if (!admitted_queries_.empty()) {
+ per_query_share = kMaxNumWorkerMessages / admitted_queries_.size();
+ } else {
+ LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ return;
+ }
+ DCHECK_GT(per_query_share, 0u);
+ std::vector<std::size_t> finished_queries_ids;
+
+ for (const auto &admitted_query_info : admitted_queries_) {
+ QueryManager *curr_query_manager = admitted_query_info.second.get();
+ DCHECK(curr_query_manager != nullptr);
+ std::size_t messages_collected_curr_query = 0;
+ while (messages_collected_curr_query < per_query_share) {
+ WorkerMessage *next_worker_message =
+ curr_query_manager->getNextWorkerMessage(0, -1);
+ if (next_worker_message != nullptr) {
+ ++messages_collected_curr_query;
+ worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+ } else {
+ // No more work ordes from the current query at this time.
+ // Check if the query's execution is over.
+ if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ // If the query has been executed, remove it.
+ finished_queries_ids.push_back(admitted_query_info.first);
+ }
+ break;
+ }
+ }
+ }
+ for (std::size_t finished_qid : finished_queries_ids) {
+ removeQuery(finished_qid);
+ }
+}
+
+void PolicyEnforcer::removeQuery(const std::size_t query_id) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+ LOG(WARNING) << "Removing query with ID " << query_id
+ << " that hasn't finished its execution";
+ }
+ admitted_queries_.erase(query_id);
+}
+
+bool PolicyEnforcer::admitQueries(
+ const std::vector<QueryHandle *> &query_handles) {
+ for (QueryHandle *curr_query : query_handles) {
+ if (!admitQuery(curr_query)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
new file mode 100644
index 0000000..d4ba643
--- /dev/null
+++ b/query_execution/PolicyEnforcer.hpp
@@ -0,0 +1,168 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ * in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcer {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param foreman_client_id The TMB client ID of the Foreman.
+ * @param num_numa_nodes Number of NUMA nodes used by the system.
+ * @param catalog_database The CatalogDatabase used.
+ * @param storage_manager The StorageManager used.
+ * @param bus The TMB.
+ **/
+ PolicyEnforcer(const tmb::client_id foreman_client_id,
+ const std::size_t num_numa_nodes,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ tmb::MessageBus *bus)
+ : foreman_client_id_(foreman_client_id),
+ num_numa_nodes_(num_numa_nodes),
+ catalog_database_(catalog_database),
+ storage_manager_(storage_manager),
+ bus_(bus) {}
+
+ /**
+ * @brief Destructor.
+ **/
+ ~PolicyEnforcer() {
+ if (hasQueries()) {
+ LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
+ "waiting queries";
+ }
+ }
+
+ /**
+ * @brief Admit a query to the system.
+ *
+ * @param query_handle The QueryHandle for the new query.
+ *
+ * @return Whether the query was admitted to the system.
+ **/
+ bool admitQuery(QueryHandle *query_handle);
+
+ /**
+ * @brief Admit multiple queries in the system.
+ *
+ * @note In the current simple implementation, we only allow one active
+ * query in the system. Other queries will have to wait.
+ *
+ * @param query_handles A vector of QueryHandles for the queries to be
+ * admitted.
+ *
+ * @return True if all the queries were admitted, false if at least one query
+ * was not admitted.
+ **/
+ bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+ /**
+ * @brief Remove a given query that is under execution.
+ *
+ * @note This function is made public so that it is possible for a query
+ * to be killed. Otherwise, it should only be used privately by the
+ * class.
+ *
+ * TODO(harshad) - Extend this function to support removal of waiting queries.
+ *
+ * @param query_id The ID of the query to be removed.
+ **/
+ void removeQuery(const std::size_t query_id);
+
+ /**
+ * @brief Get worker messages to be dispatched. These worker messages come
+ * from the active queries.
+ *
+ * @param worker_messages The worker messages to be dispatched.
+ **/
+ void getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+ /**
+ * @brief Process a message sent to the Foreman, which gets passed on to the
+ * policy enforcer.
+ *
+ * @param message The message.
+ **/
+ void processMessage(const TaggedMessage &tagged_message);
+
+ /**
+ * @brief Check if there are any queries to be executed.
+ *
+ * @return True if there is at least one active or waiting query, false if
+ * the policy enforcer doesn't have any query.
+ **/
+ inline bool hasQueries() const {
+ return !(admitted_queries_.empty() && waiting_queries_.empty());
+ }
+
+ private:
+ static constexpr std::size_t kMaxConcurrentQueries = 1;
+ static constexpr std::size_t kMaxNumWorkerMessages = 20;
+
+ const tmb::client_id foreman_client_id_;
+ const std::size_t num_numa_nodes_;
+
+ CatalogDatabaseLite *catalog_database_;
+ StorageManager *storage_manager_;
+
+ tmb::MessageBus *bus_;
+
+ // Key = query ID, value = QueryManager* for the key query.
+ std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
+
+ // The queries which haven't been admitted yet.
+ std::queue<QueryHandle*> waiting_queries_;
+
+ DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 3bfce17..5c4c44e 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -56,7 +56,8 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
const CatalogDatabaseLite &database,
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
- tmb::MessageBus *bus) {
+ tmb::MessageBus *bus)
+ : query_id_(proto.query_id()) {
DCHECK(ProtoIsValid(proto, database))
<< "Attempted to create QueryContext from an invalid proto description:\n"
<< proto.DebugString();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7d5628d..83627bc 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -460,6 +460,13 @@ class QueryContext {
return update_groups_[id];
}
+ /**
+ * @param Get the ID of this query.
+ **/
+ inline const std::size_t getQueryID() const {
+ return query_id_;
+ }
+
private:
std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
@@ -472,6 +479,8 @@ class QueryContext {
std::vector<std::unique_ptr<Tuple>> tuples_;
std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> update_groups_;
+ const std::size_t query_id_;
+
DISALLOW_COPY_AND_ASSIGN(QueryContext);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index b37286c..98cd0b6 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -54,4 +54,6 @@ message QueryContext {
// NOTE(zuyu): For UpdateWorkOrder only.
repeated UpdateGroup update_groups = 10;
+
+ required uint64 query_id = 11;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 15803cf..9d9a9e5 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -27,6 +27,7 @@ message EmptyMessage {
message WorkOrderCompletionMessage {
required uint64 operator_index = 1;
required uint64 worker_thread_index = 2;
+ required uint64 query_id = 3;
}
message CatalogRelationNewBlockMessage {
@@ -35,16 +36,19 @@ message CatalogRelationNewBlockMessage {
// Used by PartitionAwareInsertDestination.
optional uint64 partition_id = 3;
+ required uint64 query_id = 4;
}
message DataPipelineMessage {
required uint64 operator_index = 1;
required fixed64 block_id = 2;
required int32 relation_id = 3;
+ required uint64 query_id = 4;
}
message WorkOrdersAvailableMessage {
required uint64 operator_index = 1;
+ required uint64 query_id = 2;
}
// BlockLocator related messages.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fc253bc..fa97158 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -58,6 +58,7 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
// We sort the following message types in the order of a life cycle of a query.
enum QueryExecutionMessageType : message_type_id {
+ kAdmitRequestMessage, // Requesting a query (or queries) to be admitted.
kWorkOrderMessage, // From Foreman to Worker.
kWorkOrderCompleteMessage, // From Worker to Foreman.
kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman.
@@ -67,7 +68,8 @@ enum QueryExecutionMessageType : message_type_id {
// their corresponding RelationalOperators.
kRebuildWorkOrderMessage, // From Foreman to Worker.
kRebuildWorkOrderCompleteMessage, // From Worker to Foreman.
- kPoisonMessage, // From the CLI shell to Foreman, then from Foreman to Workers.
+ kWorkloadCompletionMessage, // From Foreman to main thread.
+ kPoisonMessage, // From the main thread to Foreman and workers.
#ifdef QUICKSTEP_DISTRIBUTED
// BlockLocator related messages, sorted in a life cycle of StorageManager
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index a8b6a38..78fd159 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -17,9 +17,12 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
+#include <memory>
#include <utility>
+#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkerMessage.hpp"
#include "utility/Macros.hpp"
#include "tmb/address.h"
@@ -60,6 +63,55 @@ class QueryExecutionUtil {
std::move(tagged_message));
}
+ /**
+ * @brief Construct and send an AdmitRequestMessage from a given sender to a
+ * given recipient.
+ *
+ * @param sender_id The TMB client ID of the sender.
+ * @param receiver_id The TMB client ID of the receiver.
+ * @param query_handle The QueryHandle used in the AdmitRequestMessage.
+ * @param bus A pointer to the TMB.
+ * @param tagged_message A moved from reference to the tagged message.
+ *
+ * @return A status code indicating the result of the message delivery.
+ * The caller should ensure that the status is SendStatus::kOK.
+ **/
+ static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
+ const tmb::client_id sender_id,
+ const tmb::client_id receiver_id,
+ QueryHandle *query_handle,
+ MessageBus *bus) {
+ std::unique_ptr<AdmitRequestMessage> request_message(
+ new AdmitRequestMessage(query_handle));
+ const std::size_t size_of_request_msg = sizeof(*request_message);
+ TaggedMessage admit_tagged_message(
+ request_message.release(), size_of_request_msg, kAdmitRequestMessage);
+
+ return QueryExecutionUtil::SendTMBMessage(
+ bus, sender_id, receiver_id, std::move(admit_tagged_message));
+ }
+
+ static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
+ // Terminate all threads.
+ // The sender thread broadcasts poison message to the workers and foreman.
+ // Each worker dies after receiving poison message. The order of workers'
+ // death is irrelavant.
+ MessageStyle style;
+ style.Broadcast(true);
+ Address address;
+ address.All(true);
+ std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
+ TaggedMessage poison_tagged_message(poison_message.get(),
+ sizeof(*poison_message),
+ kPoisonMessage);
+
+ const tmb::MessageBus::SendStatus send_status = bus->Send(
+ sender_id, address, style, std::move(poison_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
+ "Broadcast poison message from sender with TMB client ID " << sender_id
+ << " failed";
+ }
+
private:
/**
* @brief Constructor. Made private to avoid instantiation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/QueryManager.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.hpp b/query_execution/QueryManager.hpp
index 47f54c5..b52460f 100644
--- a/query_execution/QueryManager.hpp
+++ b/query_execution/QueryManager.hpp
@@ -25,18 +25,21 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/DAG.hpp"
#include "utility/Macros.hpp"
+#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
namespace quickstep {
class CatalogDatabaseLite;
-class ForemanMessage;
class QueryHandle;
class StorageManager;
class WorkerMessage;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
index eb9aedd..6ed6474 100644
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@ -28,6 +28,8 @@
#include "utility/Macros.hpp"
#include "utility/PtrVector.hpp"
+#include "glog/logging.h"
+
namespace quickstep {
/** \addtogroup QueryExecution
@@ -76,7 +78,7 @@ class WorkOrdersContainer {
* @return If there are pending WorkOrders.
**/
inline bool hasNormalWorkOrder(const std::size_t operator_index) const {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(operator_index < num_operators_);
return normal_workorders_[operator_index].hasWorkOrder();
}
@@ -92,9 +94,9 @@ class WorkOrdersContainer {
**/
inline bool hasNormalWorkOrderForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK(operator_index < num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
return normal_workorders_[operator_index].hasWorkOrderForNUMANode(
numa_node_id);
}
@@ -108,7 +110,7 @@ class WorkOrdersContainer {
* @return If there are pending rebuild WorkOrders.
**/
inline bool hasRebuildWorkOrder(const std::size_t operator_index) const {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(operator_index < num_operators_);
return rebuild_workorders_[operator_index].hasWorkOrder();
}
@@ -124,9 +126,9 @@ class WorkOrdersContainer {
**/
inline bool hasRebuildWorkOrderForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK(operator_index < num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
return rebuild_workorders_[operator_index].hasWorkOrderForNUMANode(
numa_node_id);
}
@@ -144,9 +146,9 @@ class WorkOrdersContainer {
**/
WorkOrder* getNormalWorkOrderForNUMANode(const std::size_t operator_index,
const int numa_node_id) {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK(operator_index < num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
return normal_workorders_[operator_index].getWorkOrderForNUMANode(
numa_node_id);
}
@@ -164,7 +166,7 @@ class WorkOrdersContainer {
**/
WorkOrder* getNormalWorkOrder(const std::size_t operator_index,
const bool prefer_single_NUMA_node = true) {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(operator_index < num_operators_);
return normal_workorders_[operator_index].getWorkOrder(
prefer_single_NUMA_node);
}
@@ -182,9 +184,9 @@ class WorkOrdersContainer {
**/
WorkOrder* getRebuildWorkOrderForNUMANode(const std::size_t operator_index,
const int numa_node_id) {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK(operator_index < num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
return rebuild_workorders_[operator_index].getWorkOrderForNUMANode(
numa_node_id);
}
@@ -202,7 +204,7 @@ class WorkOrdersContainer {
**/
WorkOrder* getRebuildWorkOrder(const std::size_t operator_index,
const bool prefer_single_NUMA_node = true) {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(operator_index < num_operators_);
return rebuild_workorders_[operator_index].getWorkOrder(
prefer_single_NUMA_node);
}
@@ -220,8 +222,8 @@ class WorkOrdersContainer {
* @param operator_index The index of the operator in the query DAG.
**/
void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
- DEBUG_ASSERT(workorder != nullptr);
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(workorder != nullptr);
+ DCHECK(operator_index < num_operators_);
normal_workorders_[operator_index].addWorkOrder(workorder);
}
@@ -238,8 +240,8 @@ class WorkOrdersContainer {
**/
void addRebuildWorkOrder(WorkOrder *workorder,
const std::size_t operator_index) {
- DEBUG_ASSERT(workorder != nullptr);
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(workorder != nullptr);
+ DCHECK(operator_index < num_operators_);
rebuild_workorders_[operator_index].addWorkOrder(workorder);
}
@@ -254,9 +256,9 @@ class WorkOrdersContainer {
**/
inline std::size_t getNumNormalWorkOrdersForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK(operator_index < num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
return normal_workorders_[operator_index].getNumWorkOrdersForNUMANode(
numa_node_id);
}
@@ -271,7 +273,7 @@ class WorkOrdersContainer {
**/
inline std::size_t getNumNormalWorkOrders(
const std::size_t operator_index) const {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(operator_index < num_operators_);
return normal_workorders_[operator_index].getNumWorkOrders();
}
@@ -286,9 +288,9 @@ class WorkOrdersContainer {
**/
inline std::size_t getNumRebuildWorkOrdersForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
- DEBUG_ASSERT(operator_index < num_operators_);
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK(operator_index < num_operators_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
return rebuild_workorders_[operator_index].getNumWorkOrdersForNUMANode(
numa_node_id);
}
@@ -303,7 +305,7 @@ class WorkOrdersContainer {
**/
inline std::size_t getNumRebuildWorkOrders(
const std::size_t operator_index) const {
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(operator_index < num_operators_);
return rebuild_workorders_[operator_index].getNumWorkOrders();
}
@@ -418,8 +420,8 @@ class WorkOrdersContainer {
void addWorkOrder(WorkOrder *workorder);
bool hasWorkOrderForNUMANode(const int numa_node_id) const {
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
return single_numa_node_workorders_[numa_node_id].hasWorkOrder() ||
multiple_numa_nodes_workorders_.hasWorkOrderForNUMANode(
numa_node_id);
@@ -440,8 +442,8 @@ class WorkOrdersContainer {
std::size_t getNumWorkOrdersForNUMANode(
const int numa_node_id) const {
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
return single_numa_node_workorders_[numa_node_id].getNumWorkOrders() +
multiple_numa_nodes_workorders_.getNumWorkOrdersForNUMANode(
numa_node_id);
@@ -463,8 +465,8 @@ class WorkOrdersContainer {
}
WorkOrder* getWorkOrderForNUMANode(const int numa_node_id) {
- DEBUG_ASSERT(numa_node_id >= 0);
- DEBUG_ASSERT(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
+ DCHECK_GE(numa_node_id, 0);
+ DCHECK(static_cast<std::size_t>(numa_node_id) < num_numa_nodes_);
WorkOrder *work_order = single_numa_node_workorders_[numa_node_id].getWorkOrder();
if (work_order == nullptr) {
work_order = multiple_numa_nodes_workorders_.getWorkOrderForNUMANode(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 645fd05..ef596e1 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -58,11 +58,14 @@ void Worker::run() {
WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
DCHECK(message.getWorkOrder() != nullptr);
message.getWorkOrder()->execute();
+ const std::size_t query_id_for_workorder =
+ message.getWorkOrder()->getQueryID();
delete message.getWorkOrder();
- sendWorkOrderCompleteMessage(annotated_msg.sender,
- message.getRelationalOpIndex(),
- tagged_message.message_type() == kRebuildWorkOrderMessage);
+ sendWorkOrderCompleteMessage(
+ annotated_msg.sender, message.getRelationalOpIndex(),
+ query_id_for_workorder,
+ tagged_message.message_type() == kRebuildWorkOrderMessage);
break;
}
case kPoisonMessage: {
@@ -76,10 +79,12 @@ void Worker::run() {
void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
const size_t op_index,
+ const size_t query_id,
const bool is_rebuild_work_order) {
serialization::WorkOrderCompletionMessage proto;
proto.set_operator_index(op_index);
proto.set_worker_thread_index(worker_thread_index_);
+ proto.set_query_id(query_id);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t proto_length = proto.ByteSize();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index b94e937..c0bafdc 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -97,11 +97,13 @@ class Worker : public Thread {
*
* @param receiver The id of the TMB client which should receive the response.
* @param op_index The index of the operator to which the WorkOrder belongs.
+ * @param query_id The ID of the query which the WorkOrder belongs to.
* @param is_rebuild_work_order True if it is a RebuildWorkOrder. Otherwise
* false.
**/
void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
const std::size_t op_index,
+ const std::size_t query_id,
const bool is_rebuild_work_order);
const std::size_t worker_thread_index_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/WorkerMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp
index ec63af9..7adf2d3 100644
--- a/query_execution/WorkerMessage.hpp
+++ b/query_execution/WorkerMessage.hpp
@@ -105,6 +105,23 @@ class WorkerMessage {
return type_;
}
+ /**
+ * @brief Set a hint for the recipient worker thread.
+ *
+ * @param recipient_index_hint The hint i.e. the worker thread index.
+ **/
+ inline void setRecipientHint(const int recipient_index_hint) {
+ recipient_index_hint_ = recipient_index_hint;
+ }
+
+ /**
+ * @brief Get the hint for the recipient worker thread. The hint is invalid if
+ * it is -1.
+ **/
+ inline int getRecipientHint() const {
+ return recipient_index_hint_;
+ }
+
private:
/**
* @brief Constructor.
@@ -120,12 +137,13 @@ class WorkerMessage {
const WorkerMessageType type)
: work_unit_(work_unit),
relational_op_index_(relational_op_index),
- type_(type) {
- }
+ type_(type),
+ recipient_index_hint_(-1) {}
WorkOrder *work_unit_;
const std::size_t relational_op_index_;
const WorkerMessageType type_;
+ int recipient_index_hint_;
};
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
index 308d5ca..62642ac 100644
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ b/query_execution/tests/QueryManager_unittest.cpp
@@ -250,6 +250,7 @@ class QueryManagerTest : public ::testing::Test {
proto.set_block_id(0); // dummy block ID
proto.set_relation_id(0); // dummy relation ID.
+ proto.set_query_id(0); // dummy query ID.
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
@@ -270,6 +271,7 @@ class QueryManagerTest : public ::testing::Test {
serialization::WorkOrderCompletionMessage proto;
proto.set_operator_index(index);
proto.set_worker_thread_index(1); // dummy worker ID.
+ proto.set_query_id(0); // dummy query ID.
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t proto_length = proto.ByteSize();
@@ -291,6 +293,7 @@ class QueryManagerTest : public ::testing::Test {
serialization::WorkOrderCompletionMessage proto;
proto.set_operator_index(index);
proto.set_worker_thread_index(1); // dummy worker thread ID.
+ proto.set_query_id(0); // dummy query ID.
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t proto_length = proto.ByteSize();
@@ -314,6 +317,7 @@ class QueryManagerTest : public ::testing::Test {
proto.set_block_id(0); // dummy block ID
proto.set_relation_id(0); // dummy relation ID.
+ proto.set_query_id(0); // dummy query ID.
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
@@ -712,11 +716,13 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
// Setup the InsertDestination proto in the query context proto.
serialization::QueryContext *query_context_proto =
query_handle_->getQueryContextProtoMutable();
+ query_context_proto->set_query_id(0); // dummy query ID.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto->insert_destinations_size();
serialization::InsertDestination *insert_destination_proto =
query_context_proto->add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto->query_id());
insert_destination_proto->set_insert_destination_type(
serialization::InsertDestinationType::BLOCK_POOL);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_execution/tests/WorkOrdersContainer_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/WorkOrdersContainer_unittest.cpp b/query_execution/tests/WorkOrdersContainer_unittest.cpp
index cf133c4..cb583ab 100644
--- a/query_execution/tests/WorkOrdersContainer_unittest.cpp
+++ b/query_execution/tests/WorkOrdersContainer_unittest.cpp
@@ -72,6 +72,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) {
// they get inserted and retrieved correctly.
std::vector<int> numa_node_ids;
// A container for one operator and no NUMA nodes.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, 0);
EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
@@ -104,11 +105,15 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) {
ASSERT_TRUE(returned_work_order != nullptr);
EXPECT_EQ(work_order.getID(), static_cast<MockNUMAWorkOrder*>(returned_work_order)->getID());
+ EXPECT_EQ(query_id, returned_work_order->getQueryID());
+
WorkOrder *returned_rebuild_work_order = w.getRebuildWorkOrder(0);
ASSERT_TRUE(returned_rebuild_work_order != nullptr);
EXPECT_EQ(work_order1.getID(),
static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+ EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
+
// Container should be empty now.
EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0));
@@ -123,6 +128,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) {
// if they get inserted and retrieved correctly and the order of retrieval.
// A container for one operator and no NUMA nodes.
std::vector<int> numa_node_ids;
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, 0);
EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
@@ -164,6 +170,8 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) {
ASSERT_TRUE(returned_work_order != nullptr);
EXPECT_EQ(static_cast<int>(kNumWorkOrders + i),
static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+ EXPECT_EQ(query_id, returned_work_order->getQueryID());
+ EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
}
// Container should be empty now.
@@ -190,6 +198,7 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) {
const std::size_t kNUMANodesUsed = numa_node_ids.size();
// A container for one operator and kNUMANodes.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, kNUMANodes);
for (std::size_t i = 0; i < kNUMANodesUsed; ++i) {
@@ -246,6 +255,9 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) {
ASSERT_TRUE(returned_rebuild_work_order != nullptr);
EXPECT_EQ(rebuild_workorders[i].getID(),
static_cast<MockNUMAWorkOrder *>(returned_rebuild_work_order)->getID());
+
+ EXPECT_EQ(query_id, returned_work_order->getQueryID());
+ EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID());
}
// No workorder should be left for this operator on any NUMA node.
@@ -291,6 +303,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
const std::size_t kNUMANodesUsed = numa_nodes.size();
// Create the container.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, kNUMANodes);
w.addNormalWorkOrder(&multiple_numa_work_order, 0);
@@ -331,6 +344,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
w.getNormalWorkOrderForNUMANode(0, numa_nodes[0]));
ASSERT_TRUE(observed_work_order != nullptr);
+ EXPECT_EQ(query_id, observed_work_order->getQueryID());
EXPECT_EQ(one_numa_work_order.getPreferredNUMANodes().front(),
observed_work_order->getPreferredNUMANodes().front());
EXPECT_EQ(one_numa_work_order.getID(), observed_work_order->getID());
@@ -348,6 +362,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
EXPECT_EQ(no_numa_work_order.getID(),
static_cast<MockNUMAWorkOrder *>(observed_non_numa_work_order)->getID());
+ EXPECT_EQ(query_id, observed_non_numa_work_order->getQueryID());
EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[0]));
EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[1]));
@@ -361,6 +376,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) {
ASSERT_TRUE(observed_work_order_multiple_numa_nodes != nullptr);
EXPECT_EQ(multiple_numa_work_order.getID(), observed_work_order_multiple_numa_nodes->getID());
+ EXPECT_EQ(query_id, observed_work_order_multiple_numa_nodes->getQueryID());
std::vector<int> observed_numa_nodes(
observed_work_order_multiple_numa_nodes->getPreferredNUMANodes());
// Look up the expected numa nodes in the observed_numa_nodes vector.
@@ -427,6 +443,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
const std::size_t kNUMANodes = numa_node_ids.size();
// Create the container.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(kNumOperators, kNUMANodes);
std::vector<std::size_t> operator_ids;
@@ -538,6 +555,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
curr_operator_id, single_numa_node_id));
ASSERT_TRUE(observed_work_order_single_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_single_numa =
normal_workorders_one_numa_ids[curr_operator_id];
@@ -550,6 +568,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
curr_operator_id, multiple_numa_node_id));
ASSERT_TRUE(observed_work_order_multiple_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_multiple_numa =
normal_workorders_multiple_numa_ids[curr_operator_id];
@@ -562,6 +581,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) {
static_cast<MockNUMAWorkOrder *>(w.getNormalWorkOrder(curr_operator_id));
ASSERT_TRUE(observed_work_order_no_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_no_numa =
normal_workorders_no_numa_ids[curr_operator_id];
@@ -620,6 +640,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
const std::size_t kNUMANodes = numa_node_ids.size();
// Create the container.
+ const std::size_t query_id = 0;
WorkOrdersContainer w(kNumOperators, kNUMANodes);
std::vector<std::size_t> operator_ids;
@@ -732,6 +753,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
curr_operator_id, single_numa_node_id));
ASSERT_TRUE(observed_work_order_single_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_single_numa =
rebuild_workorders_one_numa_ids[curr_operator_id];
@@ -744,6 +766,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
curr_operator_id, multiple_numa_node_id));
ASSERT_TRUE(observed_work_order_multiple_numa != nullptr);
+ EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_multiple_numa =
rebuild_workorders_multiple_numa_ids[curr_operator_id];
@@ -755,6 +778,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) {
MockNUMAWorkOrder *observed_work_order_no_numa =
static_cast<MockNUMAWorkOrder *>(w.getRebuildWorkOrder(curr_operator_id));
+ EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID());
// Verify if the workorder ID is correct.
const int expected_workorder_id_no_numa =
rebuild_workorders_no_numa_ids[curr_operator_id];
@@ -772,6 +796,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) {
numa_node_ids.push_back(0);
const std::size_t kNumWorkOrdersPerType = 100;
+ const std::size_t query_id = 0;
WorkOrdersContainer w(1, 2);
std::vector<int> single_numa_node_workorder_ids;
@@ -820,6 +845,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) {
MockNUMAWorkOrder *observed_work_order = static_cast<MockNUMAWorkOrder *>(
w.getNormalWorkOrder(0, prefer_single_NUMA_node));
ASSERT_TRUE(observed_work_order != nullptr);
+ EXPECT_EQ(query_id, observed_work_order->getQueryID());
if (prefer_single_NUMA_node) {
EXPECT_EQ(*single_numa_it, observed_work_order->getID());
EXPECT_EQ(1u, observed_work_order->getPreferredNUMANodes().size());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index f9fd742..075d270 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -401,7 +401,7 @@ void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) {
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto =
- query_context_proto_->add_insert_destinations();
+ addNewInsertDestinationToQueryContext();
createTemporaryCatalogRelation(physical_sample,
&output_relation,
insert_destination_proto);
@@ -513,7 +513,7 @@ void ExecutionGenerator::convertSelection(
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
createTemporaryCatalogRelation(physical_selection,
&output_relation,
insert_destination_proto);
@@ -753,7 +753,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
createTemporaryCatalogRelation(physical_plan,
&output_relation,
insert_destination_proto);
@@ -879,7 +879,7 @@ void ExecutionGenerator::convertNestedLoopsJoin(
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
createTemporaryCatalogRelation(physical_plan,
&output_relation,
insert_destination_proto);
@@ -926,7 +926,7 @@ void ExecutionGenerator::convertCopyFrom(
// Create InsertDestination proto.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(output_relation->getID());
@@ -1152,7 +1152,7 @@ void ExecutionGenerator::convertInsertTuple(
// Create InsertDestination proto.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(input_relation.getID());
@@ -1209,7 +1209,7 @@ void ExecutionGenerator::convertInsertSelection(
// Create InsertDestination proto.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(destination_relation.getID());
@@ -1280,7 +1280,7 @@ void ExecutionGenerator::convertUpdateTable(
// Create InsertDestination proto.
const QueryContext::insert_destination_id relocation_destination_index =
query_context_proto_->insert_destinations_size();
- S::InsertDestination *relocation_destination_proto = query_context_proto_->add_insert_destinations();
+ S::InsertDestination *relocation_destination_proto = addNewInsertDestinationToQueryContext();
relocation_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
relocation_destination_proto->set_relation_id(input_rel_id);
@@ -1446,7 +1446,7 @@ void ExecutionGenerator::convertAggregate(
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+ S::InsertDestination *insert_destination_proto = addNewInsertDestinationToQueryContext();
createTemporaryCatalogRelation(physical_plan,
&output_relation,
insert_destination_proto);
@@ -1496,7 +1496,7 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
const QueryContext::insert_destination_id initial_runs_destination_id =
query_context_proto_->insert_destinations_size();
S::InsertDestination *initial_runs_destination_proto =
- query_context_proto_->add_insert_destinations();
+ addNewInsertDestinationToQueryContext();
createTemporaryCatalogRelation(
physical_sort, &initial_runs_relation, initial_runs_destination_proto);
@@ -1542,7 +1542,7 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
const QueryContext::insert_destination_id merged_runs_destination_id =
query_context_proto_->insert_destinations_size();
S::InsertDestination *merged_runs_destination_proto =
- query_context_proto_->add_insert_destinations();
+ addNewInsertDestinationToQueryContext();
createTemporaryCatalogRelation(physical_sort,
&merged_runs_relation,
merged_runs_destination_proto);
@@ -1550,7 +1550,7 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
const QueryContext::insert_destination_id sorted_output_destination_id =
query_context_proto_->insert_destinations_size();
S::InsertDestination *sorted_output_destination_proto =
- query_context_proto_->add_insert_destinations();
+ addNewInsertDestinationToQueryContext();
createTemporaryCatalogRelation(physical_sort,
&sorted_relation,
sorted_output_destination_proto);
@@ -1606,7 +1606,7 @@ void ExecutionGenerator::convertTableGenerator(
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto =
- query_context_proto_->add_insert_destinations();
+ addNewInsertDestinationToQueryContext();
createTemporaryCatalogRelation(physical_tablegen,
&output_relation,
insert_destination_proto);
@@ -1635,5 +1635,11 @@ void ExecutionGenerator::convertTableGenerator(
temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation);
}
+S::InsertDestination* ExecutionGenerator::addNewInsertDestinationToQueryContext() {
+ S::InsertDestination *insert_destination_proto(query_context_proto_->add_insert_destinations());
+ insert_destination_proto->set_query_id(query_context_proto_->query_id());
+ return insert_destination_proto;
+}
+
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 0630bca..c453f7a 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -105,6 +105,7 @@ class ExecutionGenerator {
execution_plan_(DCHECK_NOTNULL(query_handle->getQueryPlanMutable())),
query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())),
execution_heuristics_(new ExecutionHeuristics()) {
+ query_context_proto_->set_query_id(query_handle_->query_id());
#ifdef QUICKSTEP_DISTRIBUTED
catalog_database_cache_proto_ = DCHECK_NOTNULL(query_handle->getCatalogDatabaseCacheProtoMutable());
#endif
@@ -374,6 +375,14 @@ class ExecutionGenerator {
*/
void dropAllTemporaryRelations();
+ /**
+ * @brief Add a new InsertDesetination to the QueryContext and set the
+ * query ID for the InsertDestination.
+ *
+ * @return A pointer to the serialized InsertDestination.
+ **/
+ serialization::InsertDestination* addNewInsertDestinationToQueryContext();
+
OptimizerContext *optimizer_context_;
QueryHandle *query_handle_;
QueryPlan *execution_plan_; // A part of QueryHandle.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 6ef2a03..5b58f75 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -116,9 +116,11 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
quickstep_cli_PrintToScreen
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_Foreman
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_Worker
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 56b53ba..930087a 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -24,7 +24,9 @@
#include "cli/DropRelation.hpp"
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/Foreman.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/Worker.hpp"
#include "query_optimizer/ExecutionGenerator.hpp"
#include "query_optimizer/LogicalGenerator.hpp"
@@ -40,6 +42,8 @@
#include "glog/logging.h"
+#include "tmb/tagged_message.h"
+
namespace quickstep {
class CatalogRelation;
@@ -90,13 +94,20 @@ void ExecutionGeneratorTestRunner::runTestCase(
physical_generator.generatePlan(
logical_generator.generatePlan(*result.parsed_statement));
execution_generator.generatePlan(physical_plan);
- foreman_->setQueryPlan(
- query_handle.getQueryPlanMutable()->getQueryPlanDAGMutable());
-
- foreman_->reconstructQueryContextFromProto(query_handle.getQueryContextProto());
- foreman_->start();
- foreman_->join();
+ AdmitRequestMessage request_message(&query_handle);
+ TaggedMessage admit_tagged_message(
+ &request_message, sizeof(request_message), kAdmitRequestMessage);
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ main_thread_client_id_,
+ foreman_->getBusClientID(),
+ std::move(admit_tagged_message));
+
+ // Receive workload completion message from Foreman.
+ const AnnotatedMessage annotated_msg =
+ bus_.Receive(main_thread_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+ DCHECK(tagged_message.message_type() == kWorkloadCompletionMessage);
const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
if (query_result_relation) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
index 8352d55..9204073 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
@@ -61,9 +61,11 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
bus_.Initialize();
- foreman_.reset(new Foreman(&bus_,
- test_database_loader_.catalog_database(),
- test_database_loader_.storage_manager()));
+ main_thread_client_id_ = bus_.Connect();
+ bus_.RegisterClientAsSender(main_thread_client_id_, kAdmitRequestMessage);
+ bus_.RegisterClientAsSender(main_thread_client_id_, kPoisonMessage);
+ bus_.RegisterClientAsReceiver(main_thread_client_id_, kWorkloadCompletionMessage);
+
worker_.reset(new Worker(0, &bus_));
std::vector<client_id> worker_client_ids;
@@ -75,27 +77,20 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
workers_.reset(new WorkerDirectory(1 /* number of workers */,
worker_client_ids, numa_nodes));
- foreman_->setWorkerDirectory(workers_.get());
+ foreman_.reset(new Foreman(main_thread_client_id_,
+ workers_.get(),
+ &bus_,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager()));
+ foreman_->start();
worker_->start();
}
~ExecutionGeneratorTestRunner() {
- std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
- TaggedMessage poison_tagged_message(poison_message.get(),
- sizeof(*poison_message),
- quickstep::kPoisonMessage);
-
- Address worker_address;
- MessageStyle single_receiver_style;
-
- worker_address.AddRecipient(worker_->getBusClientID());
- bus_.Send(foreman_->getBusClientID(),
- worker_address,
- single_receiver_style,
- std::move(poison_tagged_message));
-
+ QueryExecutionUtil::BroadcastPoisonMessage(main_thread_client_id_, &bus_);
worker_->join();
+ foreman_->join();
}
void runTestCase(const std::string &input,
@@ -112,6 +107,8 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
std::unique_ptr<WorkerDirectory> workers_;
+ tmb::client_id main_thread_client_id_;
+
// This map is needed for InsertDestination and some operators that send
// messages to Foreman directly. To know the reason behind the design of this
// map, see the note in InsertDestination.hpp.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/query_optimizer/tests/TestDatabaseLoader.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/TestDatabaseLoader.cpp b/query_optimizer/tests/TestDatabaseLoader.cpp
index 2de69b6..764ff2f 100644
--- a/query_optimizer/tests/TestDatabaseLoader.cpp
+++ b/query_optimizer/tests/TestDatabaseLoader.cpp
@@ -122,6 +122,7 @@ void TestDatabaseLoader::loadTestRelation() {
nullptr,
&storage_manager_,
0 /* dummy op index */,
+ 0, // dummy query ID.
scheduler_client_id_,
&bus_);
int sign = 1;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index f340d4e..6a8590a 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -111,6 +111,8 @@ class AggregationWorkOrder : public WorkOrder {
*
* @param query_id The ID of this query.
* @param input_block_id The block id.
+ * @param query_id The ID of this query.
+ * @param input_block_id The block id.
* @param state The AggregationState to use.
**/
AggregationWorkOrder(const std::size_t query_id,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 15dc9e3..ec3bc20 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -96,6 +96,7 @@ void DeleteWorkOrder::execute() {
proto.set_operator_index(delete_operator_index_);
proto.set_block_id(input_block_id_);
proto.set_relation_id(input_relation_.getID());
+ proto.set_query_id(query_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index c55f585..fdc9b00 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -162,6 +162,7 @@ class DeleteWorkOrder : public WorkOrder {
StorageManager *storage_manager_;
const std::size_t delete_operator_index_;
+
const tmb::client_id scheduler_client_id_;
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/DestroyHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyHashOperator.hpp b/relational_operators/DestroyHashOperator.hpp
index 7d8acb7..b7fe1ac 100644
--- a/relational_operators/DestroyHashOperator.hpp
+++ b/relational_operators/DestroyHashOperator.hpp
@@ -48,6 +48,8 @@ class DestroyHashOperator : public RelationalOperator {
*
* @param query_id The ID of the query to which this operator belongs.
* @param hash_table_index The index of the JoinHashTable in QueryContext.
+ * @param query_id The ID of the query to which this operator belongs.
+ * @param hash_table_index The index of the JoinHashTable in QueryContext.
**/
DestroyHashOperator(const std::size_t query_id,
const QueryContext::join_hash_table_id hash_table_index)
@@ -80,6 +82,8 @@ class DestroyHashWorkOrder : public WorkOrder {
*
* @param query_id The ID of the query to which this WorkOrder belongs.
* @param hash_table_index The index of the JoinHashTable in QueryContext.
+ * @param query_id The ID of the query to which this WorkOrder belongs.
+ * @param hash_table_index The index of the JoinHashTable in QueryContext.
* @param query_context The QueryContext to use.
**/
DestroyHashWorkOrder(const std::size_t query_id,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/DropTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.hpp b/relational_operators/DropTableOperator.hpp
index a0a8d6e..0cdb733 100644
--- a/relational_operators/DropTableOperator.hpp
+++ b/relational_operators/DropTableOperator.hpp
@@ -57,6 +57,8 @@ class DropTableOperator : public RelationalOperator {
*
* @param query_id The ID of the query to which this operator belongs.
* @param relation The relation to drop.
+ * @param query_id The ID of the query to which this operator belongs.
+ * @param relation The relation to drop.
* @param database The databse where to drop \c relation.
* @param only_drop_blocks If true, only drop the blocks belonging to \c
* relation, but leave \c relation in \c database.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 1d5d4e3..d2d3e74 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -631,6 +631,7 @@ class HashOuterJoinWorkOrder : public WorkOrder {
* is using attributes from the build relation as input. Note that the
* length of this vector should equal the length of \p selection.
* @param lookup_block_id The block id of the probe_relation.
+ * @param query_id The ID of the query to which this WorkOrder belongs.
* @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
**/
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index 86f8eaf..3125447 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -85,6 +85,7 @@ class RebuildWorkOrder : public WorkOrder {
proto.set_operator_index(input_operator_index_);
proto.set_block_id(block_ref_->getID());
proto.set_relation_id(input_relation_id_);
+ proto.set_query_id(query_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index 6bf5719..40fde13 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -259,6 +259,7 @@ void SortMergeRunWorkOrder::execute() {
// Send completion message to operator.
FeedbackMessage msg(SortMergeRunOperator::kRunOutputMessage,
+ getQueryID(),
operator_index_,
serialized_output.first,
serialized_output.second);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/TableGeneratorOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.hpp b/relational_operators/TableGeneratorOperator.hpp
index 6a6af4b..c8adb9a 100644
--- a/relational_operators/TableGeneratorOperator.hpp
+++ b/relational_operators/TableGeneratorOperator.hpp
@@ -115,6 +115,8 @@ class TableGeneratorWorkOrder : public WorkOrder {
*
* @param query_id The ID of the query to which this WorkOrder belongs.
* @param generator_function The GeneratorFunctionHandle to use.
+ * @param query_id The ID of the query to which this WorkOrder belongs.
+ * @param generator_function The GeneratorFunctionHandle to use.
* @param output_destination The InsertDestination to insert the generated
* output.
**/
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 2130563..7a99000 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -91,6 +91,7 @@ void UpdateWorkOrder::execute() {
proto.set_operator_index(update_operator_index_);
proto.set_block_id(input_block_id_);
proto.set_relation_id(relation_.getID());
+ proto.set_query_id(query_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index cebb9b5..b4f9b9d 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -181,6 +181,7 @@ class UpdateWorkOrder : public WorkOrder {
StorageManager *storage_manager_;
const std::size_t update_operator_index_;
+
const tmb::client_id scheduler_client_id_;
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 059865d..df195cc 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -65,20 +65,25 @@ class WorkOrder {
* relational operator.
*/
struct FeedbackMessageHeader {
+ std::size_t query_id;
std::size_t rel_op_index;
std::size_t payload_size;
FeedbackMessageType payload_type;
/**
* @brief Header constructor.
+ *
+ * @param query_id The ID of the query.
* @param relational_op_index Index of the relation operator.
* @param payload_size Size of the payload of the message.
* @param payload_type Type of payload.
*/
- FeedbackMessageHeader(const std::size_t relational_op_index,
+ FeedbackMessageHeader(const std::size_t query_id,
+ const std::size_t relational_op_index,
const std::size_t payload_size,
const FeedbackMessageType payload_type)
- : rel_op_index(relational_op_index),
+ : query_id(query_id),
+ rel_op_index(relational_op_index),
payload_size(payload_size),
payload_type(payload_type) {}
};
@@ -93,17 +98,19 @@ class WorkOrder {
* @brief Feedback message constructor.
*
* @param type Type of the message.
+ * @param query_id The ID of the query.
* @param rel_op_index Relational operator index.
* @param payload Blob of payload.
* @param payload_size Size of the payload blob.
* @param ownership Whether to take ownership of the payload blob.
*/
FeedbackMessage(const FeedbackMessageType type,
+ const std::size_t query_id,
const std::size_t rel_op_index,
void *payload,
const std::size_t payload_size,
const bool ownership = true)
- : header_(rel_op_index, payload_size, type),
+ : header_(query_id, rel_op_index, payload_size, type),
payload_(payload),
ownership_(ownership) {}
@@ -285,6 +292,13 @@ class WorkOrder {
" receiver thread with TMB client ID " << receiver_id;
}
+ /**
+ * @brief Get the ID of the query which this WorkOder belongs to.
+ **/
+ inline const std::size_t getQueryID() const {
+ return query_id_;
+ }
+
protected:
/**
* @brief Constructor.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index fdcc54f..27d974d 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -228,6 +228,8 @@ class AggregationOperatorTest : public ::testing::Test {
// Setup the aggregation state proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
+
const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
aggr_state_proto->set_relation_id(table_->getID());
@@ -277,6 +279,7 @@ class AggregationOperatorTest : public ::testing::Test {
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(result_table_->getID());
@@ -319,6 +322,8 @@ class AggregationOperatorTest : public ::testing::Test {
// Setup the aggregation state proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
+
const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
aggr_state_proto->set_relation_id(table_->getID());
@@ -362,6 +367,8 @@ class AggregationOperatorTest : public ::testing::Test {
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(result_table_->getID());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 074b603..8fee50d 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -294,6 +294,7 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -355,6 +356,7 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
const QueryContext::insert_destination_id output_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(output_relation_id);
@@ -434,6 +436,7 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -503,6 +506,7 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
const QueryContext::insert_destination_id output_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(output_relation_id);
@@ -604,6 +608,7 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -660,6 +665,7 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
const QueryContext::insert_destination_id output_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(output_relation_id);
@@ -739,6 +745,7 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -801,6 +808,7 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
const QueryContext::insert_destination_id output_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(output_relation_id);
@@ -906,6 +914,7 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -973,6 +982,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
const QueryContext::insert_destination_id output_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(output_relation_id);
@@ -1083,6 +1093,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -1150,6 +1161,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
const QueryContext::insert_destination_id output_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(output_relation_id);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index fc10671..46fb7ae 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -189,6 +189,7 @@ class RunTest : public ::testing::Test {
nullptr,
storage_manager_.get(),
kOpIndex,
+ 0, // dummy query ID.
foreman_client_id_,
&bus_));
}
@@ -433,6 +434,7 @@ class RunMergerTest : public ::testing::Test {
nullptr,
storage_manager_.get(),
kOpIndex,
+ 0, // dummy query ID.
foreman_client_id_,
&bus_));
}
@@ -1269,9 +1271,12 @@ class SortMergeRunOperatorTest : public ::testing::Test {
ASSERT_EQ(null_col3_, result_table_->getAttributeByName("null-col-3")->getID());
ASSERT_EQ(tid_col_, result_table_->getAttributeByName("tid")->getID());
+ query_context_proto_.set_query_id(0); // dummy query ID.
+
// Setup the InsertDestination proto in the query context proto.
insert_destination_index_ = query_context_proto_.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto_.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto_.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(result_table_id);
@@ -1291,6 +1296,7 @@ class SortMergeRunOperatorTest : public ::testing::Test {
run_destination_index_ = query_context_proto_.insert_destinations_size();
insert_destination_proto = query_context_proto_.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto_.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(run_table_id);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21bc4c71/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 71a80e4..3eeb7e9 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -328,10 +328,12 @@ class SortRunGenerationOperatorTest : public ::testing::Test {
const std::vector<bool> &null_ordering) {
// Setup the InsertDestination proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(result_table_->getID());