You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/10/05 22:03:11 UTC
[16/51] [abbrv] incubator-quickstep git commit: Simplified the work
order generation.
Simplified the work order generation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8d7284de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8d7284de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8d7284de
Branch: refs/heads/new-op
Commit: 8d7284decb7ebf5c0eaac232f39027ddd8bf6144
Parents: 77960a4
Author: Zuyu Zhang <zu...@cs.wisc.edu>
Authored: Mon Aug 21 19:51:55 2017 -0500
Committer: Zuyu Zhang <zu...@cs.wisc.edu>
Committed: Fri Sep 22 13:43:08 2017 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 2 -
query_execution/ForemanDistributed.cpp | 5 +-
query_execution/ForemanSingleNode.cpp | 16 +--
query_execution/QueryManagerBase.cpp | 136 ++++++++-----------
query_execution/QueryManagerBase.hpp | 79 ++---------
query_execution/QueryManagerDistributed.cpp | 54 +++-----
query_execution/QueryManagerDistributed.hpp | 3 +-
query_execution/QueryManagerSingleNode.cpp | 58 ++++----
query_execution/QueryManagerSingleNode.hpp | 7 +-
query_execution/WorkOrdersContainer.hpp | 1 +
.../tests/QueryManagerSingleNode_unittest.cpp | 58 ++++----
11 files changed, 152 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 5c750f0..9394c00 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -119,7 +119,6 @@ if (ENABLE_DISTRIBUTED)
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
quickstep_threading_ThreadUtil
- quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
tmb
${GFLAGS_LIB_NAME})
@@ -135,7 +134,6 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
quickstep_threading_ThreadUtil
- quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
tmb
${GFLAGS_LIB_NAME})
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 942f383..82cc624 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -48,7 +48,6 @@
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "threading/ThreadUtil.hpp"
-#include "utility/EqualsAnyConstant.hpp"
#include "glog/logging.h"
@@ -233,9 +232,7 @@ void ForemanDistributed::run() {
}
bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
- return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
- kCatalogRelationNewBlockMessage,
- kWorkOrderFeedbackMessage);
+ return message_type != kCatalogRelationNewBlockMessage;
}
bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 1501408..d66f1f5 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -33,7 +33,6 @@
#include "query_execution/WorkerDirectory.hpp"
#include "query_execution/WorkerMessage.hpp"
#include "threading/ThreadUtil.hpp"
-#include "utility/EqualsAnyConstant.hpp"
#include "utility/Macros.hpp"
#include "gflags/gflags.h"
@@ -179,18 +178,13 @@ void ForemanSingleNode::run() {
}
bool ForemanSingleNode::canCollectNewMessages(const tmb::message_type_id message_type) {
- if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
- kCatalogRelationNewBlockMessage,
- kWorkOrderFeedbackMessage)) {
- return false;
- } else if (worker_directory_->getLeastLoadedWorker().second <=
- FLAGS_min_load_per_worker) {
- // If the least loaded worker has only one pending work order, we should
- // collect new messages and dispatch them.
- return true;
- } else {
+ if (message_type == kCatalogRelationNewBlockMessage) {
return false;
}
+
+ // If the least loaded worker has only one pending work order, we should
+ // collect new messages and dispatch them.
+ return (worker_directory_->getLeastLoadedWorker().second <= FLAGS_min_load_per_worker);
}
void ForemanSingleNode::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index 565c6ad..374c96d 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -50,7 +50,9 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
num_operators_in_dag_(query_dag_->size()),
output_consumers_(num_operators_in_dag_),
blocking_dependencies_(num_operators_in_dag_),
- query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) {
+ query_exec_state_(new QueryExecutionState(num_operators_in_dag_)),
+ blocking_dependents_(num_operators_in_dag_),
+ non_blocking_dependencies_(num_operators_in_dag_) {
if (FLAGS_visualize_execution_dag) {
dag_visualizer_ =
std::make_unique<quickstep::ExecutionDAGVisualizer>(query_handle_->getQueryPlan());
@@ -66,16 +68,22 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
query_exec_state_->setRebuildRequired(node_index);
}
+ if (query_dag_->getDependencies(node_index).empty()) {
+ non_dependent_operators_.push_back(node_index);
+ }
+
for (const pair<dag_node_index, bool> &dependent_link :
query_dag_->getDependents(node_index)) {
const dag_node_index dependent_op_index = dependent_link.first;
if (query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
// The link is a pipeline-breaker. Streaming of blocks is not possible
// between these two operators.
- blocking_dependencies_[dependent_op_index].push_back(node_index);
+ blocking_dependencies_[dependent_op_index].insert(node_index);
+ blocking_dependents_[node_index].push_back(dependent_op_index);
} else {
// The link is not a pipeline-breaker. Streaming of blocks is possible
// between these two operators.
+ non_blocking_dependencies_[dependent_op_index].insert(node_index);
output_consumers_[node_index].push_back(dependent_op_index);
}
}
@@ -102,6 +110,12 @@ void QueryManagerBase::processFeedbackMessage(
RelationalOperator *op =
query_dag_->getNodePayloadMutable(op_index);
op->receiveFeedbackMessage(msg);
+
+ if (query_exec_state_->hasDoneGenerationWorkOrders(op_index)) {
+ return;
+ }
+
+ fetchNormalWorkOrders(op_index);
}
void QueryManagerBase::processWorkOrderCompleteMessage(
@@ -109,97 +123,32 @@ void QueryManagerBase::processWorkOrderCompleteMessage(
const partition_id part_id) {
query_exec_state_->decrementNumQueuedWorkOrders(op_index);
- // Check if new work orders are available and fetch them if so.
- fetchNormalWorkOrders(op_index);
+ if (!checkNormalExecutionOver(op_index)) {
+ // Normal execution under progress for this operator.
+ return;
+ }
if (checkRebuildRequired(op_index)) {
- if (checkNormalExecutionOver(op_index)) {
- if (!checkRebuildInitiated(op_index)) {
- if (initiateRebuild(op_index)) {
- // Rebuild initiated and completed right away.
- markOperatorFinished(op_index);
- } else {
- // Rebuild under progress.
- }
- } else if (checkRebuildOver(op_index)) {
- // Rebuild was under progress and now it is over.
- markOperatorFinished(op_index);
- }
- } else {
- // Normal execution under progress for this operator.
+ DCHECK(!checkRebuildInitiated(op_index));
+ if (!initiateRebuild(op_index)) {
+ // Rebuild under progress.
+ return;
}
- } else if (checkOperatorExecutionOver(op_index)) {
- // Rebuild not required for this operator and its normal execution is
- // complete.
- markOperatorFinished(op_index);
+ // Rebuild initiated and completed right away.
}
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- // Process the dependent operator (of the operator whose WorkOrder
- // was just executed) for which all the dependencies have been met.
- processOperator(dependent_op_index, true);
- }
- }
+ markOperatorFinished(op_index);
}
void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
const partition_id part_id) {
query_exec_state_->decrementNumRebuildWorkOrders(op_index);
- if (checkRebuildOver(op_index)) {
- markOperatorFinished(op_index);
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
- }
- }
- }
-}
-
-void QueryManagerBase::processOperator(const dag_node_index index,
- const bool recursively_check_dependents) {
- if (fetchNormalWorkOrders(index)) {
- // Fetched work orders. Return to wait for the generated work orders to
- // execute, and skip the execution-finished checks.
+ if (!checkRebuildOver(op_index)) {
return;
}
- if (checkNormalExecutionOver(index)) {
- if (checkRebuildRequired(index)) {
- if (!checkRebuildInitiated(index)) {
- // Rebuild hasn't started, initiate it.
- if (initiateRebuild(index)) {
- // Rebuild initiated and completed right away.
- markOperatorFinished(index);
- } else {
- // Rebuild WorkOrders have been generated.
- return;
- }
- } else if (checkRebuildOver(index)) {
- // Rebuild had been initiated and it is over.
- markOperatorFinished(index);
- }
- } else {
- // Rebuild is not required and normal execution over, mark finished.
- markOperatorFinished(index);
- }
- // If we reach here, that means the operator has been marked as finished.
- if (recursively_check_dependents) {
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
- }
- }
- }
- }
+ markOperatorFinished(op_index);
}
void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
@@ -214,23 +163,44 @@ void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id, part_id);
// Because of the streamed input just fed, check if there are any new
// WorkOrders available and if so, fetch them.
- fetchNormalWorkOrders(consumer_index);
+ if (checkAllBlockingDependenciesMet(consumer_index)) {
+ fetchNormalWorkOrders(consumer_index);
+ }
}
}
void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
query_exec_state_->setExecutionFinished(index);
+ for (const dag_node_index dependent_op_index : blocking_dependents_[index]) {
+ blocking_dependencies_[dependent_op_index].erase(index);
+ }
+
+ for (const dag_node_index dependent_op_index : output_consumers_[index]) {
+ non_blocking_dependencies_[dependent_op_index].erase(index);
+ }
+
RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
op->updateCatalogOnCompletion();
const relation_id output_rel = op->getOutputRelationID();
+
for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
const dag_node_index dependent_op_index = dependent_link.first;
- RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
- // Signal dependent operator that current operator is done feeding input blocks.
if (output_rel >= 0) {
- dependent_op->doneFeedingInputBlocks(output_rel);
+ // Signal dependent operator that current operator is done feeding input blocks.
+ query_dag_->getNodePayloadMutable(dependent_op_index)->doneFeedingInputBlocks(output_rel);
+ }
+
+ if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+ // Process the dependent operator (of the operator whose WorkOrder
+ // was just executed) for which all the dependencies have been met.
+ if (!fetchNormalWorkOrders(dependent_op_index) &&
+ non_blocking_dependencies_[dependent_op_index].empty() &&
+ checkNormalExecutionOver(dependent_op_index) &&
+ (!checkRebuildRequired(dependent_op_index) || initiateRebuild(dependent_op_index))) {
+ markOperatorFinished(dependent_op_index);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 78d67cc..366ab61 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -22,6 +22,7 @@
#include <cstddef>
#include <memory>
+#include <unordered_set>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -165,56 +166,20 @@ class QueryManagerBase {
protected:
/**
- * @brief Process a current relational operator: Get its workorders and store
- * them in the WorkOrdersContainer for this query. If the operator can
- * be marked as done, do so.
- *
- * @param index The index of the relational operator to be processed in the
- * query plan DAG.
- * @param recursively_check_dependents If an operator is done, should we
- * call processOperator on its dependents recursively.
- **/
- void processOperator(const dag_node_index index,
- const bool recursively_check_dependents);
-
- /**
* @brief This function does the following things:
* 1. Mark the given relational operator as "done".
- * 2. For all the dependents of this operator, check if all of their
- * blocking dependencies are met. If so inform them that the blocking
- * dependencies are met.
- * 3. Check if the given operator is done producing output. If it's
- * done, inform the dependents that they won't receive input anymore
- * from the given operator.
+ * 2. For all the dependents of this operator, check if the given
+ * operator is done producing output. If it's done, inform the
+ * dependents that they won't receive input anymore from the given
+ * operator.
+ * 3. Check if all of their blocking dependencies are met. If so
+ * fetch normal work orders.
*
* @param index The index of the given relational operator in the DAG.
**/
void markOperatorFinished(const dag_node_index index);
/**
- * @brief Check if all the dependencies of the node at specified index have
- * finished their execution.
- *
- * @note This function's true return value is a pre-requisite for calling
- * getRebuildWorkOrders()
- *
- * @param node_index The index of the specified node in the query DAG.
- *
- * @return True if all the dependencies have finished their execution. False
- * otherwise.
- **/
- inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
- for (const dag_node_index dependency_index :
- query_dag_->getDependencies(node_index)) {
- // If at least one of the dependencies is not met, return false.
- if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
- return false;
- }
- }
- return true;
- }
-
- /**
* @brief Check if all the blocking dependencies of the node at specified
* index have finished their execution.
*
@@ -229,27 +194,7 @@ class QueryManagerBase {
**/
inline bool checkAllBlockingDependenciesMet(
const dag_node_index node_index) const {
- for (const dag_node_index blocking_dependency_index :
- blocking_dependencies_[node_index]) {
- if (!query_exec_state_->hasExecutionFinished(
- blocking_dependency_index)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * @brief Check if the execution of the given operator is over.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the execution of the given operator is over, false
- * otherwise.
- **/
- inline bool checkOperatorExecutionOver(const dag_node_index index) const {
- return this->checkNormalExecutionOver(index) &&
- (!checkRebuildRequired(index) || this->checkRebuildOver(index));
+ return blocking_dependencies_[node_index].empty();
}
/**
@@ -295,7 +240,9 @@ class QueryManagerBase {
std::vector<std::vector<dag_node_index>> output_consumers_;
// For all nodes, store their pipeline breaking dependencies (if any).
- std::vector<std::vector<dag_node_index>> blocking_dependencies_;
+ std::vector<std::unordered_set<dag_node_index>> blocking_dependencies_;
+
+ std::vector<dag_node_index> non_dependent_operators_;
std::unique_ptr<QueryExecutionState> query_exec_state_;
@@ -338,6 +285,10 @@ class QueryManagerBase {
**/
virtual bool checkRebuildOver(const dag_node_index index) const = 0;
+ // For all nodes, store their pipeline breaking dependents (if any).
+ std::vector<std::vector<dag_node_index>> blocking_dependents_;
+ std::vector<std::unordered_set<dag_node_index>> non_blocking_dependencies_;
+
DISALLOW_COPY_AND_ASSIGN(QueryManagerBase);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 1144e9f..30a1396 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -67,10 +67,11 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
bus_(bus),
normal_workorder_protos_container_(
new WorkOrderProtosContainer(num_operators_in_dag_)) {
- // Collect all the workorders from all the relational operators in the DAG.
- for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
- if (checkAllBlockingDependenciesMet(index)) {
- processOperator(index, false);
+ // Collect all the workorders from all the non-blocking relational operators in the DAG.
+ for (const dag_node_index index : non_dependent_operators_) {
+ if (!fetchNormalWorkOrders(index)) {
+ DCHECK(!checkRebuildRequired(index) || initiateRebuild(index));
+ markOperatorFinished(index);
}
}
@@ -177,35 +178,22 @@ serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessag
}
bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) {
- bool generated_new_workorder_protos = false;
- if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
- // Do not fetch any work units until all blocking dependencies are met.
- // The releational operator is not aware of blocking dependencies for
- // uncorrelated scalar queries.
- if (!checkAllBlockingDependenciesMet(index)) {
- return false;
- }
- const size_t num_pending_workorder_protos_before =
- normal_workorder_protos_container_->getNumWorkOrderProtos(index);
- const bool done_generation =
- query_dag_->getNodePayloadMutable(index)
- ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
- if (done_generation) {
- query_exec_state_->setDoneGenerationWorkOrders(index);
- }
-
- // TODO(shoban): It would be a good check to see if operator is making
- // useful progress, i.e., the operator either generates work orders to
- // execute or still has pending work orders executing. However, this will not
- // work if Foreman polls operators without feeding data. This check can be
- // enabled, if Foreman is refactored to call getAllWorkOrders() only when
- // pending work orders are completed or new input blocks feed.
-
- generated_new_workorder_protos =
- (num_pending_workorder_protos_before <
- normal_workorder_protos_container_->getNumWorkOrderProtos(index));
+ // Do not fetch any work units until all blocking dependencies are met.
+ // The releational operator is not aware of blocking dependencies for
+ // uncorrelated scalar queries.
+ DCHECK(checkAllBlockingDependenciesMet(index));
+ DCHECK(!query_exec_state_->hasDoneGenerationWorkOrders(index));
+
+ const size_t num_pending_workorder_protos_before =
+ normal_workorder_protos_container_->getNumWorkOrderProtos(index);
+ const bool done_generation =
+ query_dag_->getNodePayloadMutable(index)
+ ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
+ if (done_generation) {
+ query_exec_state_->setDoneGenerationWorkOrders(index);
}
- return generated_new_workorder_protos;
+
+ return (num_pending_workorder_protos_before < normal_workorder_protos_container_->getNumWorkOrderProtos(index));
}
void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
@@ -225,7 +213,7 @@ void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_no
query_dag_->getDependents(op_index)) {
const dag_node_index dependent_op_index = dependent_link.first;
if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
+ fetchNormalWorkOrders(dependent_op_index);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index a021fdd..8d870c6 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -250,8 +250,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
private:
bool checkNormalExecutionOver(const dag_node_index index) const override {
- return (checkAllDependenciesMet(index) &&
- !normal_workorder_protos_container_->hasWorkOrderProto(index) &&
+ return (!normal_workorder_protos_container_->hasWorkOrderProto(index) &&
query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
query_exec_state_->hasDoneGenerationWorkOrders(index));
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 82a0de6..2c9f673 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -61,10 +61,11 @@ QueryManagerSingleNode::QueryManagerSingleNode(
workorders_container_(
new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)),
database_(static_cast<const CatalogDatabase&>(*catalog_database)) {
- // Collect all the workorders from all the relational operators in the DAG.
- for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
- if (checkAllBlockingDependenciesMet(index)) {
- processOperator(index, false);
+ // Collect all the workorders from all the non-blocking relational operators in the DAG.
+ for (const dag_node_index index : non_dependent_operators_) {
+ if (!fetchNormalWorkOrders(index)) {
+ DCHECK(!checkRebuildRequired(index) || initiateRebuild(index));
+ markOperatorFinished(index);
}
}
}
@@ -87,38 +88,25 @@ WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
}
bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) {
- bool generated_new_workorders = false;
- if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
- // Do not fetch any work units until all blocking dependencies are met.
- // The releational operator is not aware of blocking dependencies for
- // uncorrelated scalar queries.
- if (!checkAllBlockingDependenciesMet(index)) {
- return false;
- }
- const size_t num_pending_workorders_before =
- workorders_container_->getNumNormalWorkOrders(index);
- const bool done_generation =
- query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
- query_context_.get(),
- storage_manager_,
- foreman_client_id_,
- bus_);
- if (done_generation) {
- query_exec_state_->setDoneGenerationWorkOrders(index);
- }
-
- // TODO(shoban): It would be a good check to see if operator is making
- // useful progress, i.e., the operator either generates work orders to
- // execute or still has pending work orders executing. However, this will not
- // work if Foreman polls operators without feeding data. This check can be
- // enabled, if Foreman is refactored to call getAllWorkOrders() only when
- // pending work orders are completed or new input blocks feed.
-
- generated_new_workorders =
- (num_pending_workorders_before <
- workorders_container_->getNumNormalWorkOrders(index));
+ // Do not fetch any work units until all blocking dependencies are met.
+ // The releational operator is not aware of blocking dependencies for
+ // uncorrelated scalar queries.
+ DCHECK(checkAllBlockingDependenciesMet(index));
+ DCHECK(!query_exec_state_->hasDoneGenerationWorkOrders(index));
+
+ const size_t num_pending_workorders_before =
+ workorders_container_->getNumNormalWorkOrders(index);
+ const bool done_generation =
+ query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
+ query_context_.get(),
+ storage_manager_,
+ foreman_client_id_,
+ bus_);
+ if (done_generation) {
+ query_exec_state_->setDoneGenerationWorkOrders(index);
}
- return generated_new_workorders;
+
+ return (num_pending_workorders_before < workorders_container_->getNumNormalWorkOrders(index));
}
bool QueryManagerSingleNode::initiateRebuild(const dag_node_index index) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index f9d038b..a726bbc 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -99,8 +99,7 @@ class QueryManagerSingleNode final : public QueryManagerBase {
private:
bool checkNormalExecutionOver(const dag_node_index index) const override {
- return (checkAllDependenciesMet(index) &&
- !workorders_container_->hasNormalWorkOrder(index) &&
+ return (!workorders_container_->hasNormalWorkOrder(index) &&
query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
query_exec_state_->hasDoneGenerationWorkOrders(index));
}
@@ -108,8 +107,8 @@ class QueryManagerSingleNode final : public QueryManagerBase {
bool initiateRebuild(const dag_node_index index) override;
bool checkRebuildOver(const dag_node_index index) const override {
- return query_exec_state_->hasRebuildInitiated(index) &&
- !workorders_container_->hasRebuildWorkOrder(index) &&
+ DCHECK(query_exec_state_->hasRebuildInitiated(index));
+ return !workorders_container_->hasRebuildWorkOrder(index) &&
(query_exec_state_->getNumRebuildWorkOrders(index) == 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
index e8d5ff8..3c2d9bf 100644
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@ -542,6 +542,7 @@ class WorkOrdersContainer {
DISALLOW_COPY_AND_ASSIGN(WorkOrdersContainer);
};
+
/** @} */
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index 19b42ac..dd3f472 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -353,14 +353,14 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
// This test creates a DAG of a single node. WorkOrders are generated
// dynamically as pending work orders complete execution, i.e.,
// getAllWorkOrders() is called multiple times. getAllWorkOrders() will be
- // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
- // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will
- // insert no WorkOrder and return true.
+ // called 3 times and 3 work orders will be returned, i.e., 2 calls to
+ // getAllWorkOrders() insert 2 WorkOrder and return false, and the last will
+ // insert 1 WorkOrder and return true.
// TODO(shoban): This test can not be more robust than this because of fixed
// scaffolding of mocking. If we use gMock, we can do much better.
const QueryPlan::DAGNodeIndex id =
- query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
+ query_plan_->addRelationalOperator(new MockOperator(true, false, 3, 3));
const MockOperator &op = static_cast<const MockOperator &>(
query_plan_->getQueryPlanDAG().getNodePayload(id));
@@ -378,7 +378,7 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
unique_ptr<WorkerMessage> worker_message;
worker_message.reset(query_manager_->getNextWorkerMessage(id, -1));
- EXPECT_TRUE(worker_message != nullptr);
+ ASSERT_TRUE(worker_message != nullptr);
EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
worker_message->getType());
EXPECT_EQ(id, worker_message->getRelationalOpIndex());
@@ -391,6 +391,7 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
if (i < 2) {
// Send a message to QueryManager upon workorder completion.
EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
+ query_manager_->fetchNormalWorkOrders(id);
} else {
// Send a message to QueryManager upon workorder completion.
// Last event.
@@ -511,7 +512,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
const QueryPlan::DAGNodeIndex id1 =
query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
const QueryPlan::DAGNodeIndex id2 =
- query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
+ query_plan_->addRelationalOperator(new MockOperator(true, true, 2));
// Create a non-blocking link.
query_plan_->addDirectDependency(id2, id1, false);
@@ -531,7 +532,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
EXPECT_EQ(1, op1.getNumWorkOrders());
EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
// op2 will generate workorder only after receiving a streaming input.
EXPECT_EQ(0, op2.getNumWorkOrders());
EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
@@ -562,7 +563,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
// A call to op2's getAllWorkOrders because of the streamed input.
- EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(1, op2.getNumWorkOrders());
// Place a message of a workorder completion of op1 on Foreman's input queue.
@@ -573,7 +574,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
// An additional call to op2's getAllWorkOrders because of completion of op1.
- EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(2, op2.getNumWorkOrders());
worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
@@ -620,7 +621,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
const QueryPlan::DAGNodeIndex id1 =
query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
const QueryPlan::DAGNodeIndex id2 =
- query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
+ query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
// Create a non-blocking link.
query_plan_->addDirectDependency(id2, id1, false);
@@ -670,7 +671,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(1, op1.getNumWorkOrders());
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(0, op2.getNumWorkOrders());
unique_ptr<WorkerMessage> worker_message;
@@ -704,7 +705,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
// Based on the streamed input, op2's getAllWorkOrders should produce a
// workorder.
- EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(1, op2.getNumWorkOrders());
worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
@@ -734,16 +735,14 @@ TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
// When an operator produces workorders but no output, the QueryManager should
// check the dependents of this operator to make progress.
const QueryPlan::DAGNodeIndex kNumNodes = 5;
- std::vector<QueryPlan::DAGNodeIndex> ids;
- ids.reserve(kNumNodes);
for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
if (i == 0) {
- ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
+ query_plan_->addRelationalOperator(new MockOperator(true, false));
} else {
- ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
+ query_plan_->addRelationalOperator(new MockOperator(true, true));
}
- VLOG(3) << ids[i];
+ VLOG(3) << i;
}
/**
@@ -753,46 +752,47 @@ TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
*
**/
for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
- query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
- static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
+ query_plan_->addDirectDependency(i + 1, i, false);
+ static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(i))
->setOutputRelationID(0xdead);
}
std::vector<const MockOperator*> operators;
for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
+ operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(i)));
}
constructQueryManager();
// operators[0] should have produced a workorder by now.
+ EXPECT_EQ(1, operators[0]->getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(1, operators[0]->getNumWorkOrders());
unique_ptr<WorkerMessage> worker_message;
- worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1));
+ worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));
EXPECT_TRUE(worker_message != nullptr);
EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
worker_message->getType());
- EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex());
+ EXPECT_EQ(0, worker_message->getRelationalOpIndex());
delete worker_message->getWorkOrder();
- EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
- EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
+ EXPECT_EQ(1, getNumWorkOrdersInExecution(0));
+ EXPECT_FALSE(getOperatorFinishedStatus(0));
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
+ for (QueryPlan::DAGNodeIndex i = 1; i < kNumNodes; ++i) {
+ EXPECT_EQ(0, operators[i]->getNumCalls(MockOperator::kGetAllWorkOrders));
}
// Send a message to QueryManager upon workorder (generated by operators[0])
// completion.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
+ EXPECT_TRUE(placeWorkOrderCompleteMessage(0));
for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
- EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(i));
+ EXPECT_TRUE(getOperatorFinishedStatus(i));
if (i < kNumNodes - 1) {
EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
}