You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:27 UTC

[35/50] incubator-quickstep git commit: Used multiple Shiftbosses in the distributed unit tests.

Used multiple Shiftbosses in the distributed unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c4f4b285
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c4f4b285
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c4f4b285

Branch: refs/heads/quickstep_partition_parser_support
Commit: c4f4b285b63e8a3d6bb8fd312f6d00cc8f1cbd45
Parents: 4286d75
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Nov 12 22:31:42 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 27 11:46:23 2016 -0800

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        | 14 +++---
 query_execution/PolicyEnforcerDistributed.cpp |  6 ++-
 query_execution/QueryExecutionState.hpp       | 53 ++++++++++++++++++++++
 query_execution/QueryManagerDistributed.cpp   | 35 ++++++++------
 query_execution/QueryManagerDistributed.hpp   |  4 +-
 5 files changed, 88 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index d619657..7dccce4 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -238,17 +238,19 @@ void ForemanDistributed::run() {
 bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
   return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
                                         kCatalogRelationNewBlockMessage,
-                                        kWorkOrderFeedbackMessage) &&
-         // TODO(zuyu): Multiple Shiftbosses support.
-         !shiftboss_directory_.hasReachedCapacity(0);
+                                        kWorkOrderFeedbackMessage);
 }
 
 void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
+  const size_t num_shiftbosses = shiftboss_directory_.size();
+  size_t shiftboss_index = 0u;
   for (const auto &message : messages) {
     DCHECK(message != nullptr);
-    // TODO(zuyu): Multiple Shiftbosses support.
-    sendWorkOrderMessage(0, *message);
-    shiftboss_directory_.incrementNumQueuedWorkOrders(0);
+    sendWorkOrderMessage(shiftboss_index, *message);
+    shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index);
+
+    // TO(zuyu): Take data-locality into account for scheduling.
+    shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index c06fd86..6e09ea8 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -140,8 +140,10 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
   QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
 
   const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
-  query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders);
-  shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders);
+  const size_t shiftboss_index = proto.shiftboss_index();
+  query_manager->processInitiateRebuildResponseMessage(
+      proto.operator_index(), num_rebuild_work_orders, shiftboss_index);
+  shiftboss_directory_->addNumQueuedWorkOrders(shiftboss_index, num_rebuild_work_orders);
 
   if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
     onQueryCompletion(query_manager);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/QueryExecutionState.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionState.hpp b/query_execution/QueryExecutionState.hpp
index f5281d5..a4273dc 100644
--- a/query_execution/QueryExecutionState.hpp
+++ b/query_execution/QueryExecutionState.hpp
@@ -22,6 +22,12 @@
 
 #include <cstddef>
 #include <unordered_map>
+
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
+#ifdef QUICKSTEP_DISTRIBUTED
+#include <unordered_set>
+#endif  // QUICKSTEP_DISTRIBUTED
+
 #include <utility>
 #include <vector>
 
@@ -103,6 +109,49 @@ class QueryExecutionState {
     }
   }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Update the rebuild status of the given operator the number of
+   *        pending rebuild work orders, after the rebuild has been initiated.
+   *
+   * @param operator_index The index of the given operator.
+   * @param num_rebuild_workorders The number of rebuild workorders of the given
+   *        operator.
+   * @param shiftboss_index The index of the Shiftboss that rebuilt.
+   **/
+  void updateRebuildStatus(const std::size_t operator_index,
+                           const std::size_t num_rebuild_workorders,
+                           const std::size_t shiftboss_index) {
+    DCHECK_LT(operator_index, num_operators_);
+    auto search_res = rebuild_status_.find(operator_index);
+    DCHECK(search_res != rebuild_status_.end() && search_res->second.has_initiated);
+    search_res->second.num_pending_workorders += num_rebuild_workorders;
+    search_res->second.rebuilt_shiftboss_indexes.insert(shiftboss_index);
+  }
+
+  /**
+   * @brief Check if the rebuild has been finished for the given operator.
+   *
+   * @param operator_index The index of the given operator.
+   * @param num_shiftbosses The number of the Shiftbosses for rebuilt.
+   *
+   * @return True if the rebuild has been finished, false otherwise.
+   **/
+  inline bool hasRebuildFinished(const std::size_t operator_index,
+                                 const std::size_t num_shiftbosses) const {
+    DCHECK_LT(operator_index, num_operators_);
+    const auto search_res = rebuild_status_.find(operator_index);
+    DCHECK(search_res != rebuild_status_.end());
+
+    const auto &rebuild_status = search_res->second;
+    DCHECK(rebuild_status.has_initiated);
+
+    return rebuild_status.rebuilt_shiftboss_indexes.size() == num_shiftbosses &&
+           rebuild_status.num_pending_workorders == 0u;
+  }
+
+#endif  // QUICKSTEP_DISTRIBUTED
+
   /**
    * @brief Check if the rebuild has been initiated for the given operator.
    *
@@ -314,6 +363,10 @@ class QueryExecutionState {
     // The number of pending rebuild workorders for the operator.
     // Valid if and only if 'has_initiated' is true.
     std::size_t num_pending_workorders;
+
+#ifdef QUICKSTEP_DISTRIBUTED
+    std::unordered_set<std::size_t> rebuilt_shiftboss_indexes;
+#endif  // QUICKSTEP_DISTRIBUTED
   };
 
   // Key is dag_node_index for which rebuild is required.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 7d45933..20650d0 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -36,7 +36,9 @@
 
 #include "glog/logging.h"
 
+#include "tmb/address.h"
 #include "tmb/id_typedefs.h"
+#include "tmb/tagged_message.h"
 
 using std::free;
 using std::malloc;
@@ -125,16 +127,16 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index)
 }
 
 void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
-                                                                    const std::size_t num_rebuild_work_orders) {
-  // TODO(zuyu): Multiple Shiftbosses support.
-  query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
+                                                                    const std::size_t num_rebuild_work_orders,
+                                                                    const std::size_t shiftboss_index) {
+  query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index);
 
-  if (num_rebuild_work_orders != 0u) {
+  if (!query_exec_state_->hasRebuildFinished(op_index, shiftboss_directory_->size())) {
     // Wait for the rebuild work orders to finish.
     return;
   }
 
-  // No needs for rebuilds.
+  // No needs for rebuilds, or the rebuild has finished.
   markOperatorFinished(op_index);
 
   for (const std::pair<dag_node_index, bool> &dependent_link :
@@ -168,17 +170,20 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
                            kInitiateRebuildMessage);
   free(proto_bytes);
 
+  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+  tmb::Address shiftboss_addresses;
+  for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
+    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+  }
+
   LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
-            << "') to Shiftboss";
-  // TODO(zuyu): Multiple workers support.
-  QueryExecutionUtil::SendTMBMessage(bus_,
-                                     foreman_client_id_,
-                                     shiftboss_directory_->getClientId(0),
-                                     move(tagged_msg));
-
-  // The negative value indicates that the number of rebuild work orders is to be
-  // determined.
-  query_exec_state_->setRebuildStatus(index, -1, true);
+            << "') to all Shiftbosses";
+  QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+                                       shiftboss_addresses,
+                                       move(tagged_msg),
+                                       bus_);
+
+  query_exec_state_->setRebuildStatus(index, 0, true);
 
   // Wait for Shiftbosses to report the number of rebuild work orders.
   return false;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index e609ab8..f8ac53c 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -73,9 +73,11 @@ class QueryManagerDistributed final : public QueryManagerBase {
    *        for initiating the rebuild work order.
    * @param num_rebuild_work_orders The number of the rebuild work orders
    *        generated for the operator indexed by 'op_index'.
+   * @param shiftboss_index The index of the Shiftboss that sends the message.
    **/
   void processInitiateRebuildResponseMessage(const dag_node_index op_index,
-                                             const std::size_t num_rebuild_work_orders);
+                                             const std::size_t num_rebuild_work_orders,
+                                             const std::size_t shiftboss_index);
 
   /**
    * @brief Get the next normal workorder to be excuted, wrapped in a