You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/04 16:59:26 UTC

incubator-quickstep git commit: Processed InitiateRebuild message for the distributed version.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 5e5ec9e3b -> 991f7a4eb


Processed InitiateRebuild message for the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/991f7a4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/991f7a4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/991f7a4e

Branch: refs/heads/master
Commit: 991f7a4eb3923a9fb43f88e1fe2b5ea057ee92e8
Parents: 5e5ec9e
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Wed Aug 3 15:28:00 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Thu Aug 4 09:45:57 2016 -0700

----------------------------------------------------------------------
 query_execution/QueryExecutionMessages.proto |  1 +
 query_execution/QueryManagerDistributed.cpp  | 23 +++++++++++++++++++++++
 query_execution/QueryManagerDistributed.hpp  | 14 +++++++++++++-
 query_execution/Shiftboss.cpp                |  2 ++
 4 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/991f7a4e/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 5a089d2..4922042 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -113,6 +113,7 @@ message InitiateRebuildResponseMessage {
   required uint64 query_id = 1;
   required uint64 operator_index = 2;
   required uint64 num_rebuild_work_orders = 3;
+  required uint64 shiftboss_index = 4;
 }
 
 message SaveQueryResultMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/991f7a4e/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index e906fa5..5d589d5 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -119,6 +119,28 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index)
   return generated_new_workorder_protos;
 }
 
+void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                                                    const std::size_t num_rebuild_work_orders) {
+  // TODO(zuyu): Multiple Shiftbosses support.
+  query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
+
+  if (num_rebuild_work_orders != 0u) {
+    // Wait for the rebuild work orders to finish.
+    return;
+  }
+
+  // No needs for rebuilds.
+  markOperatorFinished(op_index);
+
+  for (const std::pair<dag_node_index, bool> &dependent_link :
+       query_dag_->getDependents(op_index)) {
+    const dag_node_index dependent_op_index = dependent_link.first;
+    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+      processOperator(dependent_op_index, true);
+    }
+  }
+}
+
 bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
   DCHECK(checkRebuildRequired(index));
   DCHECK(!checkRebuildInitiated(index));
@@ -127,6 +149,7 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
   DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
 
   serialization::InitiateRebuildMessage proto;
+  proto.set_query_id(query_id_);
   proto.set_operator_index(index);
   proto.set_insert_destination_index(op.getInsertDestinationID());
   proto.set_relation_id(op.getOutputRelationID());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/991f7a4e/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 8641c22..5d9be81 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -15,6 +15,7 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
 
+#include <cstddef>
 #include <memory>
 
 #include "query_execution/QueryExecutionState.hpp"
@@ -60,7 +61,18 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   bool fetchNormalWorkOrders(const dag_node_index index) override;
 
- /**
+  /**
+   * @brief Process the initiate rebuild work order response message.
+   *
+   * @param op_index The index of the specified operator node in the query DAG
+   *        for initiating the rebuild work order.
+   * @param num_rebuild_work_orders The number of the rebuild work orders
+   *        generated for the operator indexed by 'op_index'.
+   **/
+  void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                             const std::size_t num_rebuild_work_orders);
+
+  /**
    * @brief Get the next normal workorder to be excuted, wrapped in a
    *        WorkOrderMessage proto.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/991f7a4e/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 7f655c6..3f7129b 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -311,6 +311,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
   proto.set_query_id(query_id);
   proto.set_operator_index(op_index);
   proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
+  // TODO(zuyu): Multiple Shiftboss support.
+  proto.set_shiftboss_index(0);
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));