You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/08 20:49:15 UTC
[16/16] incubator-quickstep git commit: Merge branch
'reorder-query-id-param' into query-manager-used-in-foreman
Merge branch 'reorder-query-id-param' into query-manager-used-in-foreman
Conflicts:
query_execution/Foreman.cpp
query_execution/QueryManager.cpp
query_execution/tests/Foreman_unittest.cpp
relational_operators/RebuildWorkOrder.hpp
relational_operators/UpdateOperator.cpp
relational_operators/UpdateOperator.hpp
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e8ead861
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e8ead861
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e8ead861
Branch: refs/heads/query-manager-used-in-foreman
Commit: e8ead86103341a34ac7449ed416d1dbba67496a7
Parents: bef0ae1 d67f61e
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jun 8 15:46:28 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jun 8 15:46:28 2016 -0500
----------------------------------------------------------------------
cli/QuickstepCli.cpp | 1 -
query_execution/AdmitRequestMessage.hpp | 2 -
query_execution/Foreman.cpp | 3 -
query_execution/QueryExecutionUtil.hpp | 6 +-
query_execution/QueryManager.cpp | 14 +-
query_execution/WorkOrdersContainer.hpp | 10 +-
query_execution/tests/Foreman_unittest.cpp | 945 +++++++++++++++++++
query_execution/tests/QueryManager_unittest.cpp | 2 +-
.../tests/WorkOrdersContainer_unittest.cpp | 18 +-
query_optimizer/ExecutionGenerator.cpp | 174 ++--
.../tests/ExecutionHeuristics_unittest.cpp | 25 +-
relational_operators/AggregationOperator.cpp | 12 +-
relational_operators/AggregationOperator.hpp | 14 +-
relational_operators/BuildHashOperator.cpp | 4 +-
relational_operators/BuildHashOperator.hpp | 21 +-
relational_operators/CreateIndexOperator.hpp | 10 +-
relational_operators/CreateTableOperator.hpp | 10 +-
relational_operators/DeleteOperator.cpp | 8 +-
relational_operators/DeleteOperator.hpp | 18 +-
relational_operators/DestroyHashOperator.cpp | 5 +-
relational_operators/DestroyHashOperator.hpp | 14 +-
relational_operators/DropTableOperator.cpp | 3 +-
relational_operators/DropTableOperator.hpp | 15 +-
.../FinalizeAggregationOperator.cpp | 6 +-
.../FinalizeAggregationOperator.hpp | 20 +-
relational_operators/HashJoinOperator.cpp | 27 +-
relational_operators/HashJoinOperator.hpp | 242 +++--
relational_operators/InsertOperator.cpp | 6 +-
relational_operators/InsertOperator.hpp | 20 +-
.../NestedLoopsJoinOperator.cpp | 41 +-
.../NestedLoopsJoinOperator.hpp | 54 +-
relational_operators/RebuildWorkOrder.hpp | 19 +-
relational_operators/RelationalOperator.hpp | 8 +-
relational_operators/SampleOperator.cpp | 46 +-
relational_operators/SampleOperator.hpp | 31 +-
relational_operators/SaveBlocksOperator.cpp | 1 +
relational_operators/SaveBlocksOperator.hpp | 14 +-
relational_operators/SelectOperator.cpp | 23 +-
relational_operators/SelectOperator.hpp | 60 +-
relational_operators/SortMergeRunOperator.cpp | 1 +
relational_operators/SortMergeRunOperator.hpp | 29 +-
.../SortRunGenerationOperator.cpp | 4 +-
.../SortRunGenerationOperator.hpp | 28 +-
relational_operators/TableGeneratorOperator.cpp | 7 +-
relational_operators/TableGeneratorOperator.hpp | 23 +-
relational_operators/TextScanOperator.cpp | 27 +-
relational_operators/TextScanOperator.hpp | 34 +-
relational_operators/UpdateOperator.cpp | 22 +-
relational_operators/UpdateOperator.hpp | 42 +-
relational_operators/WorkOrder.hpp | 16 +-
relational_operators/WorkOrder.proto | 1 +
relational_operators/WorkOrderFactory.cpp | 35 +-
.../tests/AggregationOperator_unittest.cpp | 18 +-
.../tests/HashJoinOperator_unittest.cpp | 156 +--
.../tests/SortMergeRunOperator_unittest.cpp | 11 +-
.../SortRunGenerationOperator_unittest.cpp | 16 +-
.../tests/TextScanOperator_unittest.cpp | 5 +-
57 files changed, 1825 insertions(+), 602 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --cc cli/QuickstepCli.cpp
index 6f954fe,558d6eb..d65eb89
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@@ -402,25 -389,14 +402,24 @@@ int main(int argc, char* argv[])
}
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- foreman.setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
-
- foreman.reconstructQueryContextFromProto(query_handle->getQueryContextProto());
-
+ // TODO(harshad) - In the future when queries are not admitted
+ // immediately, calculate their waiting time separately.
- LOG(INFO) << "Address of query handle in QuickstepCli: " << query_handle.get();
+ start = std::chrono::steady_clock::now();
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ main_thread_client_id,
+ foreman.getBusClientID(),
+ query_handle.get(),
+ &bus);
+ if (send_status != tmb::MessageBus::SendStatus::kOK) {
+ fprintf(stderr, "\nQuery %s could not be admitted to the system\n", command_string->c_str());
+ continue;
+ }
try {
- start = std::chrono::steady_clock::now();
- foreman.start();
- foreman.join();
+ const AnnotatedMessage annotated_msg =
+ bus.Receive(main_thread_client_id, 0, true);
+ const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+ DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
end = std::chrono::steady_clock::now();
const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --cc query_execution/AdmitRequestMessage.hpp
index e2a1077,0000000..e33b354
mode 100644,000000..100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@@ -1,75 -1,0 +1,73 @@@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
+
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class QueryHandle;
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A message requesting a query or queries to be admitted to the system.
+ **/
+class AdmitRequestMessage {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_handles The handles of the queries requesting to be admitted
+ * to the system.
+ **/
+ explicit AdmitRequestMessage(const std::vector<QueryHandle*> &query_handles)
+ : query_handles_(query_handles) {}
+
+ /**
+ * @brief Constructor for requesting single query admission.
+ *
+ * @param query_handle The handle of the query requesting to be admitted.
+ **/
+ explicit AdmitRequestMessage(QueryHandle *query_handle) {
+ query_handles_.push_back(query_handle);
+ }
+
+ /**
+ * @brief Get the query handles from this message.
+ **/
+ const std::vector<QueryHandle*>& getQueryHandles() const {
- LOG(INFO) << "Query handle in getQueryHandles(): " << query_handles_.front()
- << " [0] " << query_handles_[0];
+ return query_handles_;
+ }
+
+ private:
+ std::vector<QueryHandle*> query_handles_;
+
+ DISALLOW_COPY_AND_ASSIGN(AdmitRequestMessage);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --cc query_execution/Foreman.cpp
index 6cec70a,7705819..3609120
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@@ -97,43 -58,161 +97,40 @@@ void Foreman::run()
// We can pin the foreman thread to a CPU if specified.
ThreadUtil::BindToCPU(cpu_id_);
}
- initializeState();
-
- DEBUG_ASSERT(query_dag_ != nullptr);
- const dag_node_index dag_size = query_dag_->size();
-
- // Collect all the workorders from all the relational operators in the DAG.
- for (dag_node_index index = 0; index < dag_size; ++index) {
- if (checkAllBlockingDependenciesMet(index)) {
- query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
- processOperator(index, false);
- }
- }
-
- // Dispatch the WorkOrders generated so far.
- dispatchWorkerMessages(0, 0);
-}
-
-void Foreman::processWorkOrderCompleteMessage(const dag_node_index op_index,
- const size_t worker_thread_index) {
- query_exec_state_->decrementNumQueuedWorkOrders(op_index);
-
- // As the given worker finished executing a WorkOrder, decrement its number
- // of queued WorkOrders.
- workers_->decrementNumQueuedWorkOrders(worker_thread_index);
-
- // Check if new work orders are available and fetch them if so.
- fetchNormalWorkOrders(op_index);
-
- if (checkRebuildRequired(op_index)) {
- if (checkNormalExecutionOver(op_index)) {
- if (!checkRebuildInitiated(op_index)) {
- if (initiateRebuild(op_index)) {
- // Rebuild initiated and completed right away.
- markOperatorFinished(op_index);
- } else {
- // Rebuild under progress.
- }
- } else if (checkRebuildOver(op_index)) {
- // Rebuild was under progress and now it is over.
- markOperatorFinished(op_index);
- }
- } else {
- // Normal execution under progress for this operator.
- }
- } else if (checkOperatorExecutionOver(op_index)) {
- // Rebuild not required for this operator and its normal execution is
- // complete.
- markOperatorFinished(op_index);
- }
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- // Process the dependent operator (of the operator whose WorkOrder
- // was just executed) for which all the dependencies have been met.
- processOperator(dependent_op_index, true);
- }
- }
-
- // Dispatch the WorkerMessages to the workers. We prefer to start the search
- // for the schedulable WorkOrders beginning from 'op_index'. The first
- // candidate worker to receive the next WorkOrder is the one that sent the
- // response message to Foreman.
- dispatchWorkerMessages(worker_thread_index, op_index);
-}
-
-void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
- const size_t worker_thread_index) {
- query_exec_state_->decrementNumRebuildWorkOrders(op_index);
- workers_->decrementNumQueuedWorkOrders(worker_thread_index);
-
- if (checkRebuildOver(op_index)) {
- markOperatorFinished(op_index);
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
- }
- }
- }
-
- // Dispatch the WorkerMessages to the workers. We prefer to start the search
- // for the schedulable WorkOrders beginning from 'op_index'. The first
- // candidate worker to receive the next WorkOrder is the one that sent the
- // response message to Foreman.
- dispatchWorkerMessages(worker_thread_index, op_index);
-}
-
-void Foreman::processDataPipelineMessage(const dag_node_index op_index,
- const block_id block,
- const relation_id rel_id) {
- for (const dag_node_index consumer_index :
- output_consumers_[op_index]) {
- // Feed the streamed block to the consumer. Note that 'output_consumers_'
- // only contain those dependents of operator with index = op_index which are
- // eligible to receive streamed input.
- query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
- // Because of the streamed input just fed, check if there are any new
- // WorkOrders available and if so, fetch them.
- fetchNormalWorkOrders(consumer_index);
- }
-
- // Dispatch the WorkerMessages to the workers. We prefer to start the search
- // for the schedulable WorkOrders beginning from 'op_index'. The first
- // candidate worker to receive the next WorkOrder is the one that sent the
- // response message to Foreman.
- // TODO(zuyu): Improve the data locality for the next WorkOrder.
- dispatchWorkerMessages(0, op_index);
-}
-
-void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
- RelationalOperator *op =
- query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
- op->receiveFeedbackMessage(msg);
-}
-
-void Foreman::run() {
- // Initialize before for Foreman eventloop.
- initialize();
// Event loop
- while (!query_exec_state_->hasQueryExecutionFinished()) {
+ for (;;) {
// Receive() causes this thread to sleep until next message is received.
- AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, true);
+ const AnnotatedMessage annotated_msg =
+ bus_->Receive(foreman_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
- switch (tagged_message.message_type()) {
- case kWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
- break;
- }
- case kRebuildWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processRebuildWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
+ const tmb::message_type_id message_type = tagged_message.message_type();
+ switch (message_type) {
+ case kCatalogRelationNewBlockMessage: // Fall through
+ case kDataPipelineMessage:
+ case kRebuildWorkOrderCompleteMessage:
+ case kWorkOrderCompleteMessage:
+ case kWorkOrderFeedbackMessage:
+ case kWorkOrdersAvailableMessage: {
+ policy_enforcer_->processMessage(tagged_message);
break;
}
- case kCatalogRelationNewBlockMessage: {
- serialization::CatalogRelationNewBlockMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- const block_id block = proto.block_id();
-
- CatalogRelation *relation =
- static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
- relation->addBlock(block);
-
- if (proto.has_partition_id()) {
- relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(), block);
+ case kAdmitRequestMessage: {
+ const AdmitRequestMessage *msg =
+ static_cast<const AdmitRequestMessage *>(tagged_message.message());
+ const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
+
- LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front() <<
- " [0]: " << query_handles[0];
+ DCHECK(!query_handles.empty());
+ bool all_queries_admitted = true;
+ if (query_handles.size() == 1u) {
- LOG(INFO) << "Address of query handle in foreman: " << query_handles.front();
+ all_queries_admitted =
+ policy_enforcer_->admitQuery(query_handles.front());
+ } else {
+ all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+ }
+ if (!all_queries_admitted) {
+ LOG(WARNING) << "The scheduler could not admit all the queries";
+ // TODO(harshad) - Inform the main thread about the failure.
}
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --cc query_execution/QueryExecutionUtil.hpp
index 267bbe6,a8b6a38..50f277e
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@@ -65,67 -60,6 +65,65 @@@ class QueryExecutionUtil
std::move(tagged_message));
}
+ /**
+ * @brief Construct and send an AdmitRequestMessage from a given sender to a
+ * given recipient.
+ *
+ * @param sender_id The TMB client ID of the sender.
+ * @param receiver_id The TMB client ID of the receiver.
+ * @param query_handle The QueryHandle used in the AdmitRequestMessage.
+ * @param bus A pointer to the TMB.
+ * @param tagged_message A moved from reference to the tagged message.
+ *
+ * @return A status code indicating the result of the message delivery.
+ * The caller should ensure that the status is SendStatus::kOK.
+ **/
+ static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
+ const tmb::client_id sender_id,
+ const tmb::client_id receiver_id,
+ QueryHandle *query_handle,
+ MessageBus *bus) {
- LOG(INFO) << "Address of query handle in QExecUtil: " << query_handle;
- std::unique_ptr<AdmitRequestMessage> request_message(new AdmitRequestMessage(query_handle));
- const std::vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
- LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front() << " [0]: " << query_handles[0];
++ std::unique_ptr<AdmitRequestMessage> request_message(
++ new AdmitRequestMessage(query_handle));
+ const std::size_t size_of_request_msg = sizeof(*request_message);
+ TaggedMessage admit_tagged_message(
+ request_message.release(), size_of_request_msg, kAdmitRequestMessage);
+
+ return QueryExecutionUtil::SendTMBMessage(
+ bus, sender_id, receiver_id, std::move(admit_tagged_message));
+ }
+
+ /**
+ * @brief Broadcast a poison message from a given sender.
+ *
+ * @note This message will be received by all the clients that have registered
+ * as recipients of the poison message type.
+ *
+ * @param sender_id The TMB client ID of the sender.
+ * @param bus A pointer to the TMB.
+ **/
+ static void BroadcastPoisonMessage(const tmb::client_id sender_id,
+ tmb::MessageBus *bus) {
+ // Terminate all threads.
+ // The sender thread broadcasts poison message to the workers and foreman.
+ // Each worker dies after receiving poison message. The order of workers'
+ // death is irrelavant.
+ MessageStyle style;
+ style.Broadcast(true);
+ Address address;
+ address.All(true);
+ std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
+ TaggedMessage poison_tagged_message(poison_message.get(),
+ sizeof(*poison_message),
+ kPoisonMessage);
+
+ const tmb::MessageBus::SendStatus send_status = bus->Send(
+ sender_id, address, style, std::move(poison_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
+ "Broadcast poison message from sender with TMB client ID " << sender_id
+ << " failed";
+ }
+
private:
/**
* @brief Constructor. Made private to avoid instantiation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --cc query_execution/WorkOrdersContainer.hpp
index a1c4288,eb9aedd..3b93729
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@@ -48,14 -46,10 +48,11 @@@ class WorkOrdersContainer
*
* @param num_operators Number of operators in the query DAG.
* @param num_numa_nodes Number of NUMA nodes in the system.
- * @param query_id The ID of the query.
**/
WorkOrdersContainer(const std::size_t num_operators,
- const std::size_t num_numa_nodes,
- const std::size_t query_id)
+ const std::size_t num_numa_nodes)
- : num_operators_(num_operators), num_numa_nodes_(num_numa_nodes) {
+ : num_operators_(num_operators),
- num_numa_nodes_(num_numa_nodes),
- query_id_(query_id) {
++ num_numa_nodes_(num_numa_nodes) {
DEBUG_ASSERT(num_operators != 0);
for (std::size_t op = 0; op < num_operators; ++op) {
normal_workorders_.push_back(
@@@ -226,9 -220,8 +223,8 @@@
* @param operator_index The index of the operator in the query DAG.
**/
void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
- DEBUG_ASSERT(workorder != nullptr);
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(workorder != nullptr);
+ DCHECK(operator_index < num_operators_);
- workorder->setQueryID(query_id_);
normal_workorders_[operator_index].addWorkOrder(workorder);
}
@@@ -245,9 -238,8 +241,8 @@@
**/
void addRebuildWorkOrder(WorkOrder *workorder,
const std::size_t operator_index) {
- DEBUG_ASSERT(workorder != nullptr);
- DEBUG_ASSERT(operator_index < num_operators_);
+ DCHECK(workorder != nullptr);
+ DCHECK(operator_index < num_operators_);
- workorder->setQueryID(query_id_);
rebuild_workorders_[operator_index].addWorkOrder(workorder);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/tests/WorkOrdersContainer_unittest.cpp
----------------------------------------------------------------------
diff --cc query_execution/tests/WorkOrdersContainer_unittest.cpp
index 865f01f,cf133c4..cb583ab
--- a/query_execution/tests/WorkOrdersContainer_unittest.cpp
+++ b/query_execution/tests/WorkOrdersContainer_unittest.cpp
@@@ -72,8 -72,7 +72,8 @@@ TEST(WorkOrdersContainerTest, ZeroNUMAN
// they get inserted and retrieved correctly.
std::vector<int> numa_node_ids;
// A container for one operator and no NUMA nodes.
+ const std::size_t query_id = 0;
- WorkOrdersContainer w(1, 0, query_id);
+ WorkOrdersContainer w(1, 0);
EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0));
@@@ -128,8 -123,7 +128,8 @@@ TEST(WorkOrdersContainerTest, ZeroNUMAN
// if they get inserted and retrieved correctly and the order of retrieval.
// A container for one operator and no NUMA nodes.
std::vector<int> numa_node_ids;
+ const std::size_t query_id = 0;
- WorkOrdersContainer w(1, 0, query_id);
+ WorkOrdersContainer w(1, 0);
EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0));
@@@ -198,8 -190,7 +198,8 @@@ TEST(WorkOrdersContainerTest, MultipleN
const std::size_t kNUMANodesUsed = numa_node_ids.size();
// A container for one operator and kNUMANodes.
+ const std::size_t query_id = 0;
- WorkOrdersContainer w(1, kNUMANodes, query_id);
+ WorkOrdersContainer w(1, kNUMANodes);
for (std::size_t i = 0; i < kNUMANodesUsed; ++i) {
std::vector<int> curr_numa_node;
@@@ -303,8 -291,7 +303,8 @@@ TEST(WorkOrdersContainerTest, AllTypesW
const std::size_t kNUMANodesUsed = numa_nodes.size();
// Create the container.
+ const std::size_t query_id = 0;
- WorkOrdersContainer w(1, kNUMANodes, query_id);
+ WorkOrdersContainer w(1, kNUMANodes);
w.addNormalWorkOrder(&multiple_numa_work_order, 0);
@@@ -443,8 -427,7 +443,8 @@@ TEST(WorkOrdersContainerTest, MultipleO
const std::size_t kNUMANodes = numa_node_ids.size();
// Create the container.
+ const std::size_t query_id = 0;
- WorkOrdersContainer w(kNumOperators, kNUMANodes, query_id);
+ WorkOrdersContainer w(kNumOperators, kNUMANodes);
std::vector<std::size_t> operator_ids;
for (std::size_t i = 0; i < kNumOperators; ++i) {
@@@ -640,8 -620,7 +640,8 @@@ TEST(WorkOrdersContainerTest, MultipleO
const std::size_t kNUMANodes = numa_node_ids.size();
// Create the container.
+ const std::size_t query_id = 0;
- WorkOrdersContainer w(kNumOperators, kNUMANodes, query_id);
+ WorkOrdersContainer w(kNumOperators, kNUMANodes);
std::vector<std::size_t> operator_ids;
for (std::size_t i = 0; i < kNumOperators; ++i) {
@@@ -796,8 -772,7 +796,8 @@@ TEST(WorkOrdersContainerTest, Retrieval
numa_node_ids.push_back(0);
const std::size_t kNumWorkOrdersPerType = 100;
+ const std::size_t query_id = 0;
- WorkOrdersContainer w(1, 2, query_id);
+ WorkOrdersContainer w(1, 2);
std::vector<int> single_numa_node_workorder_ids;
std::vector<int> multiple_numa_node_workorder_ids;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --cc relational_operators/DeleteOperator.hpp
index a239f42,c55f585..fdc9b00
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@@ -159,8 -162,6 +162,7 @@@ class DeleteWorkOrder : public WorkOrde
StorageManager *storage_manager_;
const std::size_t delete_operator_index_;
- const std::size_t query_id_;
+
const tmb::client_id scheduler_client_id_;
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --cc relational_operators/RebuildWorkOrder.hpp
index fef2cc9,86f8eaf..d11fe7d
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@@ -55,16 -56,17 +56,18 @@@ class RebuildWorkOrder : public WorkOrd
* @param input_relation_id The ID of the CatalogRelation to which the given
* storage block belongs to.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
+ * @param query_id The ID of the query.
* @param bus A pointer to the TMB.
**/
- RebuildWorkOrder(MutableBlockReference &&block_ref,
- const std::size_t input_operator_index,
- const relation_id input_relation_id,
- const client_id scheduler_client_id,
- const std::size_t query_id,
- MessageBus *bus)
- : block_ref_(std::move(block_ref)),
+ RebuildWorkOrder(
+ const std::size_t query_id,
+ MutableBlockReference &&block_ref, // NOLINT(whitespace/operators)
+ const std::size_t input_operator_index,
+ const relation_id input_relation_id,
+ const client_id scheduler_client_id,
+ MessageBus *bus)
+ : WorkOrder(query_id),
+ block_ref_(std::move(block_ref)),
input_operator_index_(input_operator_index),
input_relation_id_(input_relation_id),
scheduler_client_id_(scheduler_client_id),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --cc relational_operators/TextScanOperator.cpp
index d1f1932,5acecbf..3899af4
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@@ -604,7 -609,6 +609,7 @@@ void TextSplitWorkOrder::execute()
// Notify the operator about the completion of this Work Order.
FeedbackMessage msg(TextScanOperator::kSplitWorkOrderCompletionMessage,
- getQueryID(),
++ query_id_,
operator_index_,
nullptr /* payload */,
0 /* payload_size */,
@@@ -666,7 -670,6 +671,7 @@@ void TextSplitWorkOrder::sendBlobInfoTo
const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
FeedbackMessage feedback_msg(TextScanOperator::kNewTextBlobMessage,
- getQueryID(),
++ query_id_,
operator_index_,
payload,
payload_size);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --cc relational_operators/TextScanOperator.hpp
index f87b530,3cda65b..4fd5c04
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@@ -372,8 -381,6 +381,7 @@@ class TextSplitWorkOrder : public WorkO
StorageManager *storage_manager_;
const std::size_t operator_index_; // Opeartor index.
- const std::size_t query_id_; // query ID.
+
const tmb::client_id scheduler_client_id_; // The scheduler's TMB client ID.
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --cc relational_operators/UpdateOperator.hpp
index 9673229,cebb9b5..b4f9b9d
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@@ -174,8 -181,6 +181,7 @@@ class UpdateWorkOrder : public WorkOrde
StorageManager *storage_manager_;
const std::size_t update_operator_index_;
- const std::size_t query_id_;
+
const tmb::client_id scheduler_client_id_;
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --cc relational_operators/WorkOrder.hpp
index fd4b0f1,059865d..df195cc
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@@ -292,25 -285,16 +292,23 @@@ class WorkOrder
" receiver thread with TMB client ID " << receiver_id;
}
+ /**
+ * @brief Get the ID of the query which this WorkOder belongs to.
+ **/
+ inline const std::size_t getQueryID() const {
+ return query_id_;
+ }
+
+ protected:
/**
- * @brief Set the ID of the query which the WorkOrder belongs to.
+ * @brief Constructor.
*
- * @param query_id The query ID.
+ * @param query_id The ID of the query to which this WorkOrder belongs.
**/
- void setQueryID(const std::size_t query_id) {
- query_id_ = query_id;
- }
-
- protected:
- WorkOrder() {}
+ explicit WorkOrder(const std::size_t query_id)
+ : query_id_(query_id) {}
+ const std::size_t query_id_;
// A vector of preferred NUMA node IDs where this workorder should be executed.
// These node IDs typically indicate the NUMA node IDs of the input(s) of the
// workorder. Derived classes should ensure that there are no duplicate entries
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------