You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/28 20:25:03 UTC
[2/3] incubator-quickstep git commit: Process InitRebuildResponse
message.
Process InitRebuildResponse message.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/53d2dca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/53d2dca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/53d2dca5
Branch: refs/heads/policy-enforcer-dist
Commit: 53d2dca5ffe8cce3010f3e63131cac9ff7d35632
Parents: aaecc76
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Thu Jul 28 11:54:20 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Thu Jul 28 11:57:50 2016 -0700
----------------------------------------------------------------------
query_execution/PolicyEnforcerBase.cpp | 18 ++++++++++++++++++
query_execution/PolicyEnforcerBase.hpp | 13 +++++++++++++
query_execution/QueryExecutionMessages.proto | 1 +
query_execution/QueryManagerBase.hpp | 14 ++++++++++++++
query_execution/QueryManagerDistributed.cpp | 21 +++++++++++++++++++++
query_execution/QueryManagerDistributed.hpp | 4 ++++
6 files changed, 71 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index d16a502..a0a2c8b 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -29,6 +29,7 @@
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionState.hpp"
#include "query_execution/QueryManagerBase.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
@@ -60,6 +61,23 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
break;
}
+#ifdef QUICKSTEP_DISTRIBUTED
+ case kInitiateRebuildResponseMessage: {
+ serialization::InitiateRebuildResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ op_index = proto.operator_index();
+ const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
+
+ // Check if new work orders are available.
+ admitted_queries_[query_id]->processInitiateRebuildResponseMessage(op_index, num_rebuild_work_orders);
+ incrementNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders);
+ break;
+ }
+#endif // QUICKSTEP_DISTRIBUTED
case kRebuildWorkOrderCompleteMessage: {
serialization::RebuildWorkOrderCompletionMessage proto;
// Note: This proto message contains the time it took to execute the
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 0482ebc..03088ff 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -27,6 +27,7 @@
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryManagerBase.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -179,6 +180,18 @@ class PolicyEnforcerBase {
**/
virtual bool admitQuery(QueryHandle *query_handle) = 0;
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Increment the number of queued workorders for the given worker.
+ *
+ * @param worker_index The logical ID of the given worker.
+ * @param num_new_work_orders The number of the new work orders will be
+ * executed on Worker indexed by 'worker_index'.
+ **/
+ virtual void incrementNumQueuedWorkOrders(const std::size_t worker_index,
+ const std::size_t num_new_work_orders) {}
+#endif // QUICKSTEP_DISTRIBUTED
+
/**
* @brief Decrement the number of queued workorders for the given worker by 1.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 308d736..db230e5 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -111,6 +111,7 @@ message InitiateRebuildResponseMessage {
required uint64 query_id = 1;
required uint64 operator_index = 2;
required uint64 num_rebuild_work_orders = 3;
+ required uint64 shiftboss_index = 4;
}
message SaveQueryResultMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 6edfd5c..c0c82e5 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -24,6 +24,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionState.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
@@ -128,6 +129,19 @@ class QueryManagerBase {
void processFeedbackMessage(const dag_node_index op_index,
const WorkOrder::FeedbackMessage &message);
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Process the initiate rebuild work order response message.
+ *
+ * @param op_index The index of the specified operator node in the query DAG
+ * for initiating the rebuild work order.
+ * @param num_rebuild_work_orders The number of the rebuild work orders
+ * generated for the operator indexed by 'op_index'.
+ **/
+ virtual void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+ const std::size_t num_rebuild_work_orders) {}
+#endif // QUICKSTEP_DISTRIBUTED
+
/**
* @brief Get the query status after processing an incoming message.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index e906fa5..4272fff 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -119,6 +119,27 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index)
return generated_new_workorder_protos;
}
+void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
+ const std::size_t num_rebuild_work_orders) {
+ // TODO(zuyu): Multiple workers support.
+ query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
+
+ if (num_rebuild_work_orders != 0u) {
+ // Wait for the rebuild work orders to finish.
+ return;
+ }
+
+ markOperatorFinished(op_index);
+
+ for (const std::pair<dag_node_index, bool> &dependent_link :
+ query_dag_->getDependents(op_index)) {
+ const dag_node_index dependent_op_index = dependent_link.first;
+ if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+ processOperator(dependent_op_index, true);
+ }
+ }
+}
+
bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
DCHECK(checkRebuildRequired(index));
DCHECK(!checkRebuildInitiated(index));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 8641c22..131cd86 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -15,6 +15,7 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+#include <cstddef>
#include <memory>
#include "query_execution/QueryExecutionState.hpp"
@@ -60,6 +61,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
bool fetchNormalWorkOrders(const dag_node_index index) override;
+ void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+ const std::size_t num_rebuild_work_orders) override;
+
/**
* @brief Get the next normal workorder to be excuted, wrapped in a
* WorkOrderMessage proto.