You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/10/09 16:29:15 UTC
incubator-quickstep git commit: Fixed the distributed version due to
query execution engine simplification.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 0898a77be -> e496cb58e
Fixed the distributed version due to query execution engine simplification.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e496cb58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e496cb58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e496cb58
Branch: refs/heads/master
Commit: e496cb58e10d32de9dc83d69ece84df3f5b62747
Parents: 0898a77
Author: Zuyu Zhang <zu...@cs.wisc.edu>
Authored: Fri Oct 6 22:33:02 2017 -0500
Committer: Zuyu Zhang <zu...@cs.wisc.edu>
Committed: Fri Oct 6 22:33:02 2017 -0500
----------------------------------------------------------------------
query_execution/QueryManagerDistributed.cpp | 24 +++++++++---------------
relational_operators/WorkOrderFactory.cpp | 4 ++--
2 files changed, 11 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e496cb58/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 30a1396..97b451f 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -70,8 +70,11 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
// Collect all the workorders from all the non-blocking relational operators in the DAG.
for (const dag_node_index index : non_dependent_operators_) {
if (!fetchNormalWorkOrders(index)) {
- DCHECK(!checkRebuildRequired(index) || initiateRebuild(index));
- markOperatorFinished(index);
+ if (checkRebuildRequired(index)) {
+ initiateRebuild(index);
+ } else {
+ markOperatorFinished(index);
+ }
}
}
@@ -201,21 +204,12 @@ void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_no
const std::size_t shiftboss_index) {
query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index);
- if (!query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) {
- // Wait for the rebuild work orders to finish.
- return;
+ if (query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) {
+ // No needs for rebuilds, or the rebuild has finished.
+ markOperatorFinished(op_index);
}
- // No needs for rebuilds, or the rebuild has finished.
- markOperatorFinished(op_index);
-
- for (const std::pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- fetchNormalWorkOrders(dependent_op_index);
- }
- }
+ // Wait for the rebuild work orders to finish.
}
bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e496cb58/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 5baa21b..25cc81a 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -741,7 +741,7 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)) &&
proto.HasExtension(serialization::DeleteWorkOrder::block_id) &&
proto.HasExtension(serialization::DeleteWorkOrder::operator_index) &&
- proto.GetExtension(serialization::DeleteWorkOrder::partition_id);
+ proto.HasExtension(serialization::DeleteWorkOrder::partition_id);
}
case serialization::DESTROY_AGGREGATION_STATE: {
return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) &&
@@ -1033,7 +1033,7 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)) &&
proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
proto.HasExtension(serialization::UpdateWorkOrder::block_id) &&
- proto.GetExtension(serialization::UpdateWorkOrder::partition_id);
+ proto.HasExtension(serialization::UpdateWorkOrder::partition_id);
}
case serialization::WINDOW_AGGREGATION: {
return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&