You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2018/02/26 19:15:44 UTC

[03/46] incubator-quickstep git commit: Simplified the work order generation.

Simplified the work order generation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8d7284de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8d7284de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8d7284de

Branch: refs/heads/fix-iwyu
Commit: 8d7284decb7ebf5c0eaac232f39027ddd8bf6144
Parents: 77960a4
Author: Zuyu Zhang <zu...@cs.wisc.edu>
Authored: Mon Aug 21 19:51:55 2017 -0500
Committer: Zuyu Zhang <zu...@cs.wisc.edu>
Committed: Fri Sep 22 13:43:08 2017 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |   2 -
 query_execution/ForemanDistributed.cpp          |   5 +-
 query_execution/ForemanSingleNode.cpp           |  16 +--
 query_execution/QueryManagerBase.cpp            | 136 ++++++++-----------
 query_execution/QueryManagerBase.hpp            |  79 ++---------
 query_execution/QueryManagerDistributed.cpp     |  54 +++-----
 query_execution/QueryManagerDistributed.hpp     |   3 +-
 query_execution/QueryManagerSingleNode.cpp      |  58 ++++----
 query_execution/QueryManagerSingleNode.hpp      |   7 +-
 query_execution/WorkOrdersContainer.hpp         |   1 +
 .../tests/QueryManagerSingleNode_unittest.cpp   |  58 ++++----
 11 files changed, 152 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 5c750f0..9394c00 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -119,7 +119,6 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_storage_StorageBlockInfo
                         quickstep_storage_StorageManager
                         quickstep_threading_ThreadUtil
-                        quickstep_utility_EqualsAnyConstant
                         quickstep_utility_Macros
                         tmb
                         ${GFLAGS_LIB_NAME})
@@ -135,7 +134,6 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage
                       quickstep_threading_ThreadUtil
-                      quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Macros
                       tmb
                       ${GFLAGS_LIB_NAME})

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 942f383..82cc624 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -48,7 +48,6 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
 #include "threading/ThreadUtil.hpp"
-#include "utility/EqualsAnyConstant.hpp"
 
 #include "glog/logging.h"
 
@@ -233,9 +232,7 @@ void ForemanDistributed::run() {
 }
 
 bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
-  return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
-                                        kCatalogRelationNewBlockMessage,
-                                        kWorkOrderFeedbackMessage);
+  return message_type != kCatalogRelationNewBlockMessage;
 }
 
 bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 1501408..d66f1f5 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -33,7 +33,6 @@
 #include "query_execution/WorkerDirectory.hpp"
 #include "query_execution/WorkerMessage.hpp"
 #include "threading/ThreadUtil.hpp"
-#include "utility/EqualsAnyConstant.hpp"
 #include "utility/Macros.hpp"
 
 #include "gflags/gflags.h"
@@ -179,18 +178,13 @@ void ForemanSingleNode::run() {
 }
 
 bool ForemanSingleNode::canCollectNewMessages(const tmb::message_type_id message_type) {
-  if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
-                                    kCatalogRelationNewBlockMessage,
-                                    kWorkOrderFeedbackMessage)) {
-    return false;
-  } else if (worker_directory_->getLeastLoadedWorker().second <=
-             FLAGS_min_load_per_worker) {
-    // If the least loaded worker has only one pending work order, we should
-    // collect new messages and dispatch them.
-    return true;
-  } else {
+  if (message_type == kCatalogRelationNewBlockMessage) {
     return false;
   }
+
+  // If the least loaded worker has only one pending work order, we should
+  // collect new messages and dispatch them.
+  return (worker_directory_->getLeastLoadedWorker().second <= FLAGS_min_load_per_worker);
 }
 
 void ForemanSingleNode::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index 565c6ad..374c96d 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -50,7 +50,9 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
       num_operators_in_dag_(query_dag_->size()),
       output_consumers_(num_operators_in_dag_),
       blocking_dependencies_(num_operators_in_dag_),
-      query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) {
+      query_exec_state_(new QueryExecutionState(num_operators_in_dag_)),
+      blocking_dependents_(num_operators_in_dag_),
+      non_blocking_dependencies_(num_operators_in_dag_) {
   if (FLAGS_visualize_execution_dag) {
     dag_visualizer_ =
         std::make_unique<quickstep::ExecutionDAGVisualizer>(query_handle_->getQueryPlan());
@@ -66,16 +68,22 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
       query_exec_state_->setRebuildRequired(node_index);
     }
 
+    if (query_dag_->getDependencies(node_index).empty()) {
+      non_dependent_operators_.push_back(node_index);
+    }
+
     for (const pair<dag_node_index, bool> &dependent_link :
          query_dag_->getDependents(node_index)) {
       const dag_node_index dependent_op_index = dependent_link.first;
       if (query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
         // The link is a pipeline-breaker. Streaming of blocks is not possible
         // between these two operators.
-        blocking_dependencies_[dependent_op_index].push_back(node_index);
+        blocking_dependencies_[dependent_op_index].insert(node_index);
+        blocking_dependents_[node_index].push_back(dependent_op_index);
       } else {
         // The link is not a pipeline-breaker. Streaming of blocks is possible
         // between these two operators.
+        non_blocking_dependencies_[dependent_op_index].insert(node_index);
         output_consumers_[node_index].push_back(dependent_op_index);
       }
     }
@@ -102,6 +110,12 @@ void QueryManagerBase::processFeedbackMessage(
   RelationalOperator *op =
       query_dag_->getNodePayloadMutable(op_index);
   op->receiveFeedbackMessage(msg);
+
+  if (query_exec_state_->hasDoneGenerationWorkOrders(op_index)) {
+    return;
+  }
+
+  fetchNormalWorkOrders(op_index);
 }
 
 void QueryManagerBase::processWorkOrderCompleteMessage(
@@ -109,97 +123,32 @@ void QueryManagerBase::processWorkOrderCompleteMessage(
     const partition_id part_id) {
   query_exec_state_->decrementNumQueuedWorkOrders(op_index);
 
-  // Check if new work orders are available and fetch them if so.
-  fetchNormalWorkOrders(op_index);
+  if (!checkNormalExecutionOver(op_index)) {
+    // Normal execution under progress for this operator.
+    return;
+  }
 
   if (checkRebuildRequired(op_index)) {
-    if (checkNormalExecutionOver(op_index)) {
-      if (!checkRebuildInitiated(op_index)) {
-        if (initiateRebuild(op_index)) {
-          // Rebuild initiated and completed right away.
-          markOperatorFinished(op_index);
-        } else {
-          // Rebuild under progress.
-        }
-      } else if (checkRebuildOver(op_index)) {
-        // Rebuild was under progress and now it is over.
-        markOperatorFinished(op_index);
-      }
-    } else {
-      // Normal execution under progress for this operator.
+    DCHECK(!checkRebuildInitiated(op_index));
+    if (!initiateRebuild(op_index)) {
+      // Rebuild under progress.
+      return;
     }
-  } else if (checkOperatorExecutionOver(op_index)) {
-    // Rebuild not required for this operator and its normal execution is
-    // complete.
-    markOperatorFinished(op_index);
+    // Rebuild initiated and completed right away.
   }
 
-  for (const pair<dag_node_index, bool> &dependent_link :
-       query_dag_->getDependents(op_index)) {
-    const dag_node_index dependent_op_index = dependent_link.first;
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      // Process the dependent operator (of the operator whose WorkOrder
-      // was just executed) for which all the dependencies have been met.
-      processOperator(dependent_op_index, true);
-    }
-  }
+  markOperatorFinished(op_index);
 }
 
 void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
                                                               const partition_id part_id) {
   query_exec_state_->decrementNumRebuildWorkOrders(op_index);
 
-  if (checkRebuildOver(op_index)) {
-    markOperatorFinished(op_index);
-
-    for (const pair<dag_node_index, bool> &dependent_link :
-         query_dag_->getDependents(op_index)) {
-      const dag_node_index dependent_op_index = dependent_link.first;
-      if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-        processOperator(dependent_op_index, true);
-      }
-    }
-  }
-}
-
-void QueryManagerBase::processOperator(const dag_node_index index,
-                                       const bool recursively_check_dependents) {
-  if (fetchNormalWorkOrders(index)) {
-    // Fetched work orders. Return to wait for the generated work orders to
-    // execute, and skip the execution-finished checks.
+  if (!checkRebuildOver(op_index)) {
     return;
   }
 
-  if (checkNormalExecutionOver(index)) {
-    if (checkRebuildRequired(index)) {
-      if (!checkRebuildInitiated(index)) {
-        // Rebuild hasn't started, initiate it.
-        if (initiateRebuild(index)) {
-          // Rebuild initiated and completed right away.
-          markOperatorFinished(index);
-        } else {
-          // Rebuild WorkOrders have been generated.
-          return;
-        }
-      } else if (checkRebuildOver(index)) {
-        // Rebuild had been initiated and it is over.
-        markOperatorFinished(index);
-      }
-    } else {
-      // Rebuild is not required and normal execution over, mark finished.
-      markOperatorFinished(index);
-    }
-    // If we reach here, that means the operator has been marked as finished.
-    if (recursively_check_dependents) {
-      for (const pair<dag_node_index, bool> &dependent_link :
-           query_dag_->getDependents(index)) {
-        const dag_node_index dependent_op_index = dependent_link.first;
-        if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-          processOperator(dependent_op_index, true);
-        }
-      }
-    }
-  }
+  markOperatorFinished(op_index);
 }
 
 void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
@@ -214,23 +163,44 @@ void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
     query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id, part_id);
     // Because of the streamed input just fed, check if there are any new
     // WorkOrders available and if so, fetch them.
-    fetchNormalWorkOrders(consumer_index);
+    if (checkAllBlockingDependenciesMet(consumer_index)) {
+      fetchNormalWorkOrders(consumer_index);
+    }
   }
 }
 
 void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
   query_exec_state_->setExecutionFinished(index);
 
+  for (const dag_node_index dependent_op_index : blocking_dependents_[index]) {
+    blocking_dependencies_[dependent_op_index].erase(index);
+  }
+
+  for (const dag_node_index dependent_op_index : output_consumers_[index]) {
+    non_blocking_dependencies_[dependent_op_index].erase(index);
+  }
+
   RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
   op->updateCatalogOnCompletion();
 
   const relation_id output_rel = op->getOutputRelationID();
+
   for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
     const dag_node_index dependent_op_index = dependent_link.first;
-    RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
-    // Signal dependent operator that current operator is done feeding input blocks.
     if (output_rel >= 0) {
-      dependent_op->doneFeedingInputBlocks(output_rel);
+      // Signal dependent operator that current operator is done feeding input blocks.
+      query_dag_->getNodePayloadMutable(dependent_op_index)->doneFeedingInputBlocks(output_rel);
+    }
+
+    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+      // Process the dependent operator (of the operator whose WorkOrder
+      // was just executed) for which all the dependencies have been met.
+      if (!fetchNormalWorkOrders(dependent_op_index) &&
+          non_blocking_dependencies_[dependent_op_index].empty() &&
+          checkNormalExecutionOver(dependent_op_index) &&
+          (!checkRebuildRequired(dependent_op_index) || initiateRebuild(dependent_op_index))) {
+        markOperatorFinished(dependent_op_index);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 78d67cc..366ab61 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -22,6 +22,7 @@
 
 #include <cstddef>
 #include <memory>
+#include <unordered_set>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -165,56 +166,20 @@ class QueryManagerBase {
 
  protected:
   /**
-   * @brief Process a current relational operator: Get its workorders and store
-   *        them in the WorkOrdersContainer for this query. If the operator can
-   *        be marked as done, do so.
-   *
-   * @param index The index of the relational operator to be processed in the
-   *        query plan DAG.
-   * @param recursively_check_dependents If an operator is done, should we
-   *        call processOperator on its dependents recursively.
-   **/
-  void processOperator(const dag_node_index index,
-                       const bool recursively_check_dependents);
-
-  /**
    * @brief This function does the following things:
    *        1. Mark the given relational operator as "done".
-   *        2. For all the dependents of this operator, check if all of their
-   *        blocking dependencies are met. If so inform them that the blocking
-   *        dependencies are met.
-   *        3. Check if the given operator is done producing output. If it's
-   *        done, inform the dependents that they won't receive input anymore
-   *        from the given operator.
+   *        2. For all the dependents of this operator, check if the given
+   *        operator is done producing output. If it's done, inform the
+   *        dependents that they won't receive input anymore from the given
+   *        operator.
+   *        3. Check if all of their blocking dependencies are met. If so
+   *        fetch normal work orders.
    *
    * @param index The index of the given relational operator in the DAG.
    **/
   void markOperatorFinished(const dag_node_index index);
 
   /**
-   * @brief Check if all the dependencies of the node at specified index have
-   *        finished their execution.
-   *
-   * @note This function's true return value is a pre-requisite for calling
-   *       getRebuildWorkOrders()
-   *
-   * @param node_index The index of the specified node in the query DAG.
-   *
-   * @return True if all the dependencies have finished their execution. False
-   *         otherwise.
-   **/
-  inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
-    for (const dag_node_index dependency_index :
-         query_dag_->getDependencies(node_index)) {
-      // If at least one of the dependencies is not met, return false.
-      if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
    * @brief Check if all the blocking dependencies of the node at specified
    *        index have finished their execution.
    *
@@ -229,27 +194,7 @@ class QueryManagerBase {
    **/
   inline bool checkAllBlockingDependenciesMet(
       const dag_node_index node_index) const {
-    for (const dag_node_index blocking_dependency_index :
-         blocking_dependencies_[node_index]) {
-      if (!query_exec_state_->hasExecutionFinished(
-              blocking_dependency_index)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * @brief Check if the execution of the given operator is over.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the execution of the given operator is over, false
-   *         otherwise.
-   **/
-  inline bool checkOperatorExecutionOver(const dag_node_index index) const {
-    return this->checkNormalExecutionOver(index) &&
-           (!checkRebuildRequired(index) || this->checkRebuildOver(index));
+    return blocking_dependencies_[node_index].empty();
   }
 
   /**
@@ -295,7 +240,9 @@ class QueryManagerBase {
   std::vector<std::vector<dag_node_index>> output_consumers_;
 
   // For all nodes, store their pipeline breaking dependencies (if any).
-  std::vector<std::vector<dag_node_index>> blocking_dependencies_;
+  std::vector<std::unordered_set<dag_node_index>> blocking_dependencies_;
+
+  std::vector<dag_node_index> non_dependent_operators_;
 
   std::unique_ptr<QueryExecutionState> query_exec_state_;
 
@@ -338,6 +285,10 @@ class QueryManagerBase {
    **/
   virtual bool checkRebuildOver(const dag_node_index index) const = 0;
 
+  // For all nodes, store their pipeline breaking dependents (if any).
+  std::vector<std::vector<dag_node_index>> blocking_dependents_;
+  std::vector<std::unordered_set<dag_node_index>> non_blocking_dependencies_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryManagerBase);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 1144e9f..30a1396 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -67,10 +67,11 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
       bus_(bus),
       normal_workorder_protos_container_(
           new WorkOrderProtosContainer(num_operators_in_dag_)) {
-  // Collect all the workorders from all the relational operators in the DAG.
-  for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
-    if (checkAllBlockingDependenciesMet(index)) {
-      processOperator(index, false);
+  // Collect all the workorders from all the non-blocking relational operators in the DAG.
+  for (const dag_node_index index : non_dependent_operators_) {
+    if (!fetchNormalWorkOrders(index)) {
+      DCHECK(!checkRebuildRequired(index) || initiateRebuild(index));
+      markOperatorFinished(index);
     }
   }
 
@@ -177,35 +178,22 @@ serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessag
 }
 
 bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) {
-  bool generated_new_workorder_protos = false;
-  if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
-    // Do not fetch any work units until all blocking dependencies are met.
-    // The releational operator is not aware of blocking dependencies for
-    // uncorrelated scalar queries.
-    if (!checkAllBlockingDependenciesMet(index)) {
-      return false;
-    }
-    const size_t num_pending_workorder_protos_before =
-        normal_workorder_protos_container_->getNumWorkOrderProtos(index);
-    const bool done_generation =
-        query_dag_->getNodePayloadMutable(index)
-            ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
-    if (done_generation) {
-      query_exec_state_->setDoneGenerationWorkOrders(index);
-    }
-
-    // TODO(shoban): It would be a good check to see if operator is making
-    // useful progress, i.e., the operator either generates work orders to
-    // execute or still has pending work orders executing. However, this will not
-    // work if Foreman polls operators without feeding data. This check can be
-    // enabled, if Foreman is refactored to call getAllWorkOrders() only when
-    // pending work orders are completed or new input blocks feed.
-
-    generated_new_workorder_protos =
-        (num_pending_workorder_protos_before <
-         normal_workorder_protos_container_->getNumWorkOrderProtos(index));
+  // Do not fetch any work units until all blocking dependencies are met.
+  // The releational operator is not aware of blocking dependencies for
+  // uncorrelated scalar queries.
+  DCHECK(checkAllBlockingDependenciesMet(index));
+  DCHECK(!query_exec_state_->hasDoneGenerationWorkOrders(index));
+
+  const size_t num_pending_workorder_protos_before =
+      normal_workorder_protos_container_->getNumWorkOrderProtos(index);
+  const bool done_generation =
+      query_dag_->getNodePayloadMutable(index)
+          ->getAllWorkOrderProtos(normal_workorder_protos_container_.get());
+  if (done_generation) {
+    query_exec_state_->setDoneGenerationWorkOrders(index);
   }
-  return generated_new_workorder_protos;
+
+  return (num_pending_workorder_protos_before < normal_workorder_protos_container_->getNumWorkOrderProtos(index));
 }
 
 void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
@@ -225,7 +213,7 @@ void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_no
        query_dag_->getDependents(op_index)) {
     const dag_node_index dependent_op_index = dependent_link.first;
     if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      processOperator(dependent_op_index, true);
+      fetchNormalWorkOrders(dependent_op_index);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index a021fdd..8d870c6 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -250,8 +250,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
  private:
   bool checkNormalExecutionOver(const dag_node_index index) const override {
-    return (checkAllDependenciesMet(index) &&
-            !normal_workorder_protos_container_->hasWorkOrderProto(index) &&
+    return (!normal_workorder_protos_container_->hasWorkOrderProto(index) &&
             query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
             query_exec_state_->hasDoneGenerationWorkOrders(index));
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 82a0de6..2c9f673 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -61,10 +61,11 @@ QueryManagerSingleNode::QueryManagerSingleNode(
       workorders_container_(
           new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)),
       database_(static_cast<const CatalogDatabase&>(*catalog_database)) {
-  // Collect all the workorders from all the relational operators in the DAG.
-  for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
-    if (checkAllBlockingDependenciesMet(index)) {
-      processOperator(index, false);
+  // Collect all the workorders from all the non-blocking relational operators in the DAG.
+  for (const dag_node_index index : non_dependent_operators_) {
+    if (!fetchNormalWorkOrders(index)) {
+      DCHECK(!checkRebuildRequired(index) || initiateRebuild(index));
+      markOperatorFinished(index);
     }
   }
 }
@@ -87,38 +88,25 @@ WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
 }
 
 bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) {
-  bool generated_new_workorders = false;
-  if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
-    // Do not fetch any work units until all blocking dependencies are met.
-    // The releational operator is not aware of blocking dependencies for
-    // uncorrelated scalar queries.
-    if (!checkAllBlockingDependenciesMet(index)) {
-      return false;
-    }
-    const size_t num_pending_workorders_before =
-        workorders_container_->getNumNormalWorkOrders(index);
-    const bool done_generation =
-        query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
-                                                                   query_context_.get(),
-                                                                   storage_manager_,
-                                                                   foreman_client_id_,
-                                                                   bus_);
-    if (done_generation) {
-      query_exec_state_->setDoneGenerationWorkOrders(index);
-    }
-
-    // TODO(shoban): It would be a good check to see if operator is making
-    // useful progress, i.e., the operator either generates work orders to
-    // execute or still has pending work orders executing. However, this will not
-    // work if Foreman polls operators without feeding data. This check can be
-    // enabled, if Foreman is refactored to call getAllWorkOrders() only when
-    // pending work orders are completed or new input blocks feed.
-
-    generated_new_workorders =
-        (num_pending_workorders_before <
-         workorders_container_->getNumNormalWorkOrders(index));
+  // Do not fetch any work units until all blocking dependencies are met.
+  // The releational operator is not aware of blocking dependencies for
+  // uncorrelated scalar queries.
+  DCHECK(checkAllBlockingDependenciesMet(index));
+  DCHECK(!query_exec_state_->hasDoneGenerationWorkOrders(index));
+
+  const size_t num_pending_workorders_before =
+      workorders_container_->getNumNormalWorkOrders(index);
+  const bool done_generation =
+      query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
+                                                                 query_context_.get(),
+                                                                 storage_manager_,
+                                                                 foreman_client_id_,
+                                                                 bus_);
+  if (done_generation) {
+    query_exec_state_->setDoneGenerationWorkOrders(index);
   }
-  return generated_new_workorders;
+
+  return (num_pending_workorders_before < workorders_container_->getNumNormalWorkOrders(index));
 }
 
 bool QueryManagerSingleNode::initiateRebuild(const dag_node_index index) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index f9d038b..a726bbc 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -99,8 +99,7 @@ class QueryManagerSingleNode final : public QueryManagerBase {
 
  private:
   bool checkNormalExecutionOver(const dag_node_index index) const override {
-    return (checkAllDependenciesMet(index) &&
-            !workorders_container_->hasNormalWorkOrder(index) &&
+    return (!workorders_container_->hasNormalWorkOrder(index) &&
             query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
             query_exec_state_->hasDoneGenerationWorkOrders(index));
   }
@@ -108,8 +107,8 @@ class QueryManagerSingleNode final : public QueryManagerBase {
   bool initiateRebuild(const dag_node_index index) override;
 
   bool checkRebuildOver(const dag_node_index index) const override {
-    return query_exec_state_->hasRebuildInitiated(index) &&
-           !workorders_container_->hasRebuildWorkOrder(index) &&
+    DCHECK(query_exec_state_->hasRebuildInitiated(index));
+    return !workorders_container_->hasRebuildWorkOrder(index) &&
            (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
index e8d5ff8..3c2d9bf 100644
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@ -542,6 +542,7 @@ class WorkOrdersContainer {
 
   DISALLOW_COPY_AND_ASSIGN(WorkOrdersContainer);
 };
+
 /** @} */
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index 19b42ac..dd3f472 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -353,14 +353,14 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
   // This test creates a DAG of a single node. WorkOrders are generated
   // dynamically as pending work orders complete execution, i.e.,
   // getAllWorkOrders() is called multiple times.  getAllWorkOrders() will be
-  // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
-  // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will
-  // insert no WorkOrder and return true.
+  // called 3 times and 3 work orders will be returned, i.e., 2 calls to
+  // getAllWorkOrders() insert 2 WorkOrder and return false, and the last will
+  // insert 1 WorkOrder and return true.
 
   // TODO(shoban): This test can not be more robust than this because of fixed
   // scaffolding of mocking. If we use gMock, we can do much better.
   const QueryPlan::DAGNodeIndex id =
-      query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
+      query_plan_->addRelationalOperator(new MockOperator(true, false, 3, 3));
 
   const MockOperator &op = static_cast<const MockOperator &>(
       query_plan_->getQueryPlanDAG().getNodePayload(id));
@@ -378,7 +378,7 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
     unique_ptr<WorkerMessage> worker_message;
     worker_message.reset(query_manager_->getNextWorkerMessage(id, -1));
 
-    EXPECT_TRUE(worker_message != nullptr);
+    ASSERT_TRUE(worker_message != nullptr);
     EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
               worker_message->getType());
     EXPECT_EQ(id, worker_message->getRelationalOpIndex());
@@ -391,6 +391,7 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
     if (i < 2) {
       // Send a message to QueryManager upon workorder completion.
       EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
+      query_manager_->fetchNormalWorkOrders(id);
     } else {
       // Send a message to QueryManager upon workorder completion.
       // Last event.
@@ -511,7 +512,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
   const QueryPlan::DAGNodeIndex id1 =
       query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
   const QueryPlan::DAGNodeIndex id2 =
-      query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
+      query_plan_->addRelationalOperator(new MockOperator(true, true, 2));
 
   // Create a non-blocking link.
   query_plan_->addDirectDependency(id2, id1, false);
@@ -531,7 +532,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
   EXPECT_EQ(1, op1.getNumWorkOrders());
   EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
 
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
   // op2 will generate workorder only after receiving a streaming input.
   EXPECT_EQ(0, op2.getNumWorkOrders());
   EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
@@ -562,7 +563,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
   EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
 
   // A call to op2's getAllWorkOrders because of the streamed input.
-  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(1, op2.getNumWorkOrders());
 
   // Place a message of a workorder completion of op1 on Foreman's input queue.
@@ -573,7 +574,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
   EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
 
   // An additional call to op2's getAllWorkOrders because of completion of op1.
-  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(2, op2.getNumWorkOrders());
 
   worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
@@ -620,7 +621,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
   const QueryPlan::DAGNodeIndex id1 =
       query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
   const QueryPlan::DAGNodeIndex id2 =
-      query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
+      query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
 
   // Create a non-blocking link.
   query_plan_->addDirectDependency(id2, id1, false);
@@ -670,7 +671,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
   EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(1, op1.getNumWorkOrders());
 
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(0, op2.getNumWorkOrders());
 
   unique_ptr<WorkerMessage> worker_message;
@@ -704,7 +705,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
   EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
   // Based on the streamed input, op2's getAllWorkOrders should produce a
   // workorder.
-  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(1, op2.getNumWorkOrders());
 
   worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
@@ -734,16 +735,14 @@ TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
   // When an operator produces workorders but no output, the QueryManager should
   // check the dependents of this operator to make progress.
   const QueryPlan::DAGNodeIndex kNumNodes = 5;
-  std::vector<QueryPlan::DAGNodeIndex> ids;
-  ids.reserve(kNumNodes);
 
   for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
     if (i == 0) {
-      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
+      query_plan_->addRelationalOperator(new MockOperator(true, false));
     } else {
-      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
+      query_plan_->addRelationalOperator(new MockOperator(true, true));
     }
-    VLOG(3) << ids[i];
+    VLOG(3) << i;
   }
 
   /**
@@ -753,46 +752,47 @@ TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
    *
    **/
   for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
-    query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
-    static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
+    query_plan_->addDirectDependency(i + 1, i, false);
+    static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(i))
         ->setOutputRelationID(0xdead);
   }
 
   std::vector<const MockOperator*> operators;
   for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
+    operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(i)));
   }
 
   constructQueryManager();
 
   // operators[0] should have produced a workorder by now.
+  EXPECT_EQ(1, operators[0]->getNumCalls(MockOperator::kGetAllWorkOrders));
   EXPECT_EQ(1, operators[0]->getNumWorkOrders());
 
   unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1));
+  worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));
 
   EXPECT_TRUE(worker_message != nullptr);
   EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
             worker_message->getType());
 
-  EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex());
+  EXPECT_EQ(0, worker_message->getRelationalOpIndex());
 
   delete worker_message->getWorkOrder();
 
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
-  EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(0));
+  EXPECT_FALSE(getOperatorFinishedStatus(0));
 
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
+  for (QueryPlan::DAGNodeIndex i = 1; i < kNumNodes; ++i) {
+    EXPECT_EQ(0, operators[i]->getNumCalls(MockOperator::kGetAllWorkOrders));
   }
 
   // Send a message to QueryManager upon workorder (generated by operators[0])
   // completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(0));
 
   for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
-    EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
+    EXPECT_EQ(0, getNumWorkOrdersInExecution(i));
+    EXPECT_TRUE(getOperatorFinishedStatus(i));
     if (i < kNumNodes - 1) {
       EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
     }