You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/04 16:59:26 UTC
incubator-quickstep git commit: Processed InitiateRebuild message for
the distributed version.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 5e5ec9e3b -> 991f7a4eb
Processed InitiateRebuild message for the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/991f7a4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/991f7a4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/991f7a4e
Branch: refs/heads/master
Commit: 991f7a4eb3923a9fb43f88e1fe2b5ea057ee92e8
Parents: 5e5ec9e
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Wed Aug 3 15:28:00 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Thu Aug 4 09:45:57 2016 -0700
----------------------------------------------------------------------
query_execution/QueryExecutionMessages.proto | 1 +
query_execution/QueryManagerDistributed.cpp | 23 +++++++++++++++++++++++
query_execution/QueryManagerDistributed.hpp | 14 +++++++++++++-
query_execution/Shiftboss.cpp | 2 ++
4 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/991f7a4e/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 5a089d2..4922042 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -113,6 +113,7 @@ message InitiateRebuildResponseMessage {
required uint64 query_id = 1;
required uint64 operator_index = 2;
required uint64 num_rebuild_work_orders = 3;
+ required uint64 shiftboss_index = 4;
}
message SaveQueryResultMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/991f7a4e/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index e906fa5..5d589d5 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -119,6 +119,28 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index)
return generated_new_workorder_protos;
}
+void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
+ const std::size_t num_rebuild_work_orders) {
+ // TODO(zuyu): Multiple Shiftbosses support.
+ query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
+
+ if (num_rebuild_work_orders != 0u) {
+ // Wait for the rebuild work orders to finish.
+ return;
+ }
+
+ // No needs for rebuilds.
+ markOperatorFinished(op_index);
+
+ for (const std::pair<dag_node_index, bool> &dependent_link :
+ query_dag_->getDependents(op_index)) {
+ const dag_node_index dependent_op_index = dependent_link.first;
+ if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+ processOperator(dependent_op_index, true);
+ }
+ }
+}
+
bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
DCHECK(checkRebuildRequired(index));
DCHECK(!checkRebuildInitiated(index));
@@ -127,6 +149,7 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
serialization::InitiateRebuildMessage proto;
+ proto.set_query_id(query_id_);
proto.set_operator_index(index);
proto.set_insert_destination_index(op.getInsertDestinationID());
proto.set_relation_id(op.getOutputRelationID());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/991f7a4e/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 8641c22..5d9be81 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -15,6 +15,7 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+#include <cstddef>
#include <memory>
#include "query_execution/QueryExecutionState.hpp"
@@ -60,7 +61,18 @@ class QueryManagerDistributed final : public QueryManagerBase {
bool fetchNormalWorkOrders(const dag_node_index index) override;
- /**
+ /**
+ * @brief Process the initiate rebuild work order response message.
+ *
+ * @param op_index The index of the specified operator node in the query DAG
+ * for initiating the rebuild work order.
+ * @param num_rebuild_work_orders The number of the rebuild work orders
+ * generated for the operator indexed by 'op_index'.
+ **/
+ void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+ const std::size_t num_rebuild_work_orders);
+
+ /**
* @brief Get the next normal workorder to be excuted, wrapped in a
* WorkOrderMessage proto.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/991f7a4e/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 7f655c6..3f7129b 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -311,6 +311,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
proto.set_query_id(query_id);
proto.set_operator_index(op_index);
proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
+ // TODO(zuyu): Multiple Shiftboss support.
+ proto.set_shiftboss_index(0);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));