You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/28 20:25:03 UTC

[2/3] incubator-quickstep git commit: Process InitRebuildResponse message.

Process InitRebuildResponse message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/53d2dca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/53d2dca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/53d2dca5

Branch: refs/heads/policy-enforcer-dist
Commit: 53d2dca5ffe8cce3010f3e63131cac9ff7d35632
Parents: aaecc76
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Thu Jul 28 11:54:20 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Thu Jul 28 11:57:50 2016 -0700

----------------------------------------------------------------------
 query_execution/PolicyEnforcerBase.cpp       | 18 ++++++++++++++++++
 query_execution/PolicyEnforcerBase.hpp       | 13 +++++++++++++
 query_execution/QueryExecutionMessages.proto |  1 +
 query_execution/QueryManagerBase.hpp         | 14 ++++++++++++++
 query_execution/QueryManagerDistributed.cpp  | 21 +++++++++++++++++++++
 query_execution/QueryManagerDistributed.hpp  |  4 ++++
 6 files changed, 71 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index d16a502..a0a2c8b 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -29,6 +29,7 @@
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionState.hpp"
 #include "query_execution/QueryManagerBase.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 
@@ -60,6 +61,23 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
       admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
       break;
     }
+#ifdef QUICKSTEP_DISTRIBUTED
+    case kInitiateRebuildResponseMessage: {
+      serialization::InitiateRebuildResponseMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
+
+      // Check if new work orders are available.
+      admitted_queries_[query_id]->processInitiateRebuildResponseMessage(op_index, num_rebuild_work_orders);
+      incrementNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders);
+      break;
+    }
+#endif  // QUICKSTEP_DISTRIBUTED
     case kRebuildWorkOrderCompleteMessage: {
       serialization::RebuildWorkOrderCompletionMessage proto;
       // Note: This proto message contains the time it took to execute the

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 0482ebc..03088ff 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -27,6 +27,7 @@
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryManagerBase.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -179,6 +180,18 @@ class PolicyEnforcerBase {
    **/
   virtual bool admitQuery(QueryHandle *query_handle) = 0;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Increment the number of queued workorders for the given worker.
+   *
+   * @param worker_index The logical ID of the given worker.
+   * @param num_new_work_orders The number of the new work orders will be
+   *        executed on Worker indexed by 'worker_index'.
+   **/
+  virtual void incrementNumQueuedWorkOrders(const std::size_t worker_index,
+                                            const std::size_t num_new_work_orders) {}
+#endif  // QUICKSTEP_DISTRIBUTED
+
   /**
    * @brief Decrement the number of queued workorders for the given worker by 1.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 308d736..db230e5 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -111,6 +111,7 @@ message InitiateRebuildResponseMessage {
   required uint64 query_id = 1;
   required uint64 operator_index = 2;
   required uint64 num_rebuild_work_orders = 3;
+  required uint64 shiftboss_index = 4;
 }
 
 message SaveQueryResultMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 6edfd5c..c0c82e5 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -24,6 +24,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryExecutionState.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -128,6 +129,19 @@ class QueryManagerBase {
   void processFeedbackMessage(const dag_node_index op_index,
                               const WorkOrder::FeedbackMessage &message);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Process the initiate rebuild work order response message.
+   *
+   * @param op_index The index of the specified operator node in the query DAG
+   *        for initiating the rebuild work order.
+   * @param num_rebuild_work_orders The number of the rebuild work orders
+   *        generated for the operator indexed by 'op_index'.
+   **/
+  virtual void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                                     const std::size_t num_rebuild_work_orders) {}
+#endif  // QUICKSTEP_DISTRIBUTED
+
   /**
    * @brief Get the query status after processing an incoming message.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index e906fa5..4272fff 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -119,6 +119,27 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index)
   return generated_new_workorder_protos;
 }
 
+void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                                                    const std::size_t num_rebuild_work_orders) {
+  // TODO(zuyu): Multiple workers support.
+  query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
+
+  if (num_rebuild_work_orders != 0u) {
+    // Wait for the rebuild work orders to finish.
+    return;
+  }
+
+  markOperatorFinished(op_index);
+
+  for (const std::pair<dag_node_index, bool> &dependent_link :
+       query_dag_->getDependents(op_index)) {
+    const dag_node_index dependent_op_index = dependent_link.first;
+    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+      processOperator(dependent_op_index, true);
+    }
+  }
+}
+
 bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
   DCHECK(checkRebuildRequired(index));
   DCHECK(!checkRebuildInitiated(index));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 8641c22..131cd86 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -15,6 +15,7 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
 
+#include <cstddef>
 #include <memory>
 
 #include "query_execution/QueryExecutionState.hpp"
@@ -60,6 +61,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   bool fetchNormalWorkOrders(const dag_node_index index) override;
 
+  void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                             const std::size_t num_rebuild_work_orders) override;
+
  /**
    * @brief Get the next normal workorder to be excuted, wrapped in a
    *        WorkOrderMessage proto.