You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2018/02/26 19:15:50 UTC

[09/46] incubator-quickstep git commit: Fixed the distributed version due to query execution engine simplification.

Fixed the distributed version due to query execution engine simplification.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e496cb58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e496cb58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e496cb58

Branch: refs/heads/fix-iwyu
Commit: e496cb58e10d32de9dc83d69ece84df3f5b62747
Parents: 0898a77
Author: Zuyu Zhang <zu...@cs.wisc.edu>
Authored: Fri Oct 6 22:33:02 2017 -0500
Committer: Zuyu Zhang <zu...@cs.wisc.edu>
Committed: Fri Oct 6 22:33:02 2017 -0500

----------------------------------------------------------------------
 query_execution/QueryManagerDistributed.cpp | 24 +++++++++---------------
 relational_operators/WorkOrderFactory.cpp   |  4 ++--
 2 files changed, 11 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e496cb58/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 30a1396..97b451f 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -70,8 +70,11 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
   // Collect all the workorders from all the non-blocking relational operators in the DAG.
   for (const dag_node_index index : non_dependent_operators_) {
     if (!fetchNormalWorkOrders(index)) {
-      DCHECK(!checkRebuildRequired(index) || initiateRebuild(index));
-      markOperatorFinished(index);
+      if (checkRebuildRequired(index)) {
+        initiateRebuild(index);
+      } else {
+        markOperatorFinished(index);
+      }
     }
   }
 
@@ -201,21 +204,12 @@ void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_no
                                                                     const std::size_t shiftboss_index) {
   query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index);
 
-  if (!query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) {
-    // Wait for the rebuild work orders to finish.
-    return;
+  if (query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) {
+    // No needs for rebuilds, or the rebuild has finished.
+    markOperatorFinished(op_index);
   }
 
-  // No needs for rebuilds, or the rebuild has finished.
-  markOperatorFinished(op_index);
-
-  for (const std::pair<dag_node_index, bool> &dependent_link :
-       query_dag_->getDependents(op_index)) {
-    const dag_node_index dependent_op_index = dependent_link.first;
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      fetchNormalWorkOrders(dependent_op_index);
-    }
-  }
+  // Wait for the rebuild work orders to finish.
 }
 
 bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e496cb58/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 5baa21b..25cc81a 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -741,7 +741,7 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
                  proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)) &&
              proto.HasExtension(serialization::DeleteWorkOrder::block_id) &&
              proto.HasExtension(serialization::DeleteWorkOrder::operator_index) &&
-             proto.GetExtension(serialization::DeleteWorkOrder::partition_id);
+             proto.HasExtension(serialization::DeleteWorkOrder::partition_id);
     }
     case serialization::DESTROY_AGGREGATION_STATE: {
       return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) &&
@@ -1033,7 +1033,7 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
                  proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)) &&
              proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
              proto.HasExtension(serialization::UpdateWorkOrder::block_id) &&
-             proto.GetExtension(serialization::UpdateWorkOrder::partition_id);
+             proto.HasExtension(serialization::UpdateWorkOrder::partition_id);
     }
     case serialization::WINDOW_AGGREGATION: {
       return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&