You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/10 14:45:46 UTC

[25/31] incubator-quickstep git commit: Minor refactored QueryManagerDistributed.

Minor refactored QueryManagerDistributed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8229994f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8229994f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8229994f

Branch: refs/heads/reorder-partitioned-hash-join
Commit: 8229994ff813b990830a48617d64362aa4c20c63
Parents: 1cfc1c4
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Feb 9 14:21:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 9 14:21:28 2017 -0800

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  1 -
 query_execution/PolicyEnforcerDistributed.cpp | 12 ++++++++++--
 query_execution/QueryManagerDistributed.cpp   | 17 ++++++-----------
 query_execution/QueryManagerDistributed.hpp   | 15 ++++++++++-----
 4 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index e26bde0..5ad6999 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -257,7 +257,6 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryExecutionTypedefs
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_QueryManagerBase
-                        quickstep_queryexecution_ShiftbossDirectory
                         quickstep_queryexecution_WorkOrderProtosContainer
                         quickstep_relationaloperators_RelationalOperator
                         quickstep_relationaloperators_WorkOrder_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index ef5abb0..12d2037 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -131,9 +131,17 @@ bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
   // initializes.
   initiateQueryInShiftboss(query_handle);
 
+  const std::size_t num_shiftbosses = shiftboss_directory_->size();
+
+  tmb::Address shiftboss_addresses;
+  for (std::size_t i = 0; i < num_shiftbosses; ++i) {
+    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+  }
+
   // Query with the same ID not present, ok to admit.
-  admitted_queries_[query_id].reset(
-      new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+  admitted_queries_.emplace(query_id,
+                            std::make_unique<QueryManagerDistributed>(
+                                query_handle, foreman_client_id_, num_shiftbosses, move(shiftboss_addresses), bus_));
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 6ac96ab..174c490 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -30,7 +30,6 @@
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/ShiftbossDirectory.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.pb.h"
@@ -52,12 +51,14 @@ using std::vector;
 namespace quickstep {
 
 QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
-                                                 const ShiftbossDirectory *shiftboss_directory,
                                                  const tmb::client_id foreman_client_id,
+                                                 const std::size_t num_shiftbosses,
+                                                 tmb::Address &&shiftboss_addresses,
                                                  tmb::MessageBus *bus)
     : QueryManagerBase(query_handle),
-      shiftboss_directory_(shiftboss_directory),
       foreman_client_id_(foreman_client_id),
+      num_shiftbosses_(num_shiftbosses),
+      shiftboss_addresses_(move(shiftboss_addresses)),
       bus_(bus),
       normal_workorder_protos_container_(
           new WorkOrderProtosContainer(num_operators_in_dag_)) {
@@ -142,7 +143,7 @@ void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_no
                                                                     const std::size_t shiftboss_index) {
   query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index);
 
-  if (!query_exec_state_->hasRebuildFinished(op_index, shiftboss_directory_->size())) {
+  if (!query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) {
     // Wait for the rebuild work orders to finish.
     return;
   }
@@ -181,16 +182,10 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
                            kInitiateRebuildMessage);
   free(proto_bytes);
 
-  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
-  tmb::Address shiftboss_addresses;
-  for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
-    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
-  }
-
   DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
              << "') to all Shiftbosses";
   QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
-                                       shiftboss_addresses,
+                                       shiftboss_addresses_,
                                        move(tagged_msg),
                                        bus_);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 631b15a..759fa70 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -32,6 +32,7 @@
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "utility/Macros.hpp"
 
+#include "tmb/address.h"
 #include "tmb/id_typedefs.h"
 
 namespace tmb { class MessageBus; }
@@ -39,7 +40,6 @@ namespace tmb { class MessageBus; }
 namespace quickstep {
 
 class QueryHandle;
-class ShiftbossDirectory;
 
 namespace serialization { class WorkOrderMessage; }
 
@@ -57,13 +57,15 @@ class QueryManagerDistributed final : public QueryManagerBase {
    * @brief Constructor.
    *
    * @param query_handle The QueryHandle object for this query.
-   * @param shiftboss_directory The ShiftbossDirectory to use.
    * @param foreman_client_id The TMB client ID of the foreman thread.
+   * @param num_shiftbosses The number of Shiftbosses for rebuild.
+   * @param shiftboss_addresses The TMB Address of Shiftbosses for rebuild.
    * @param bus The TMB used for communication.
    **/
   QueryManagerDistributed(QueryHandle *query_handle,
-                          const ShiftbossDirectory *shiftboss_directory,
                           const tmb::client_id foreman_client_id,
+                          const std::size_t num_shiftbosses,
+                          tmb::Address &&shiftboss_addresses,
                           tmb::MessageBus *bus);
 
   ~QueryManagerDistributed() override {}
@@ -153,9 +155,12 @@ class QueryManagerDistributed final : public QueryManagerBase {
            (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
   }
 
-  const ShiftbossDirectory *shiftboss_directory_;
-
   const tmb::client_id foreman_client_id_;
+
+  // TODO(quickstep-team): deal with Shiftboss failure.
+  const std::size_t num_shiftbosses_;
+  const tmb::Address shiftboss_addresses_;
+
   tmb::MessageBus *bus_;
 
   std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;