You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:27 UTC
[35/50] incubator-quickstep git commit: Used multiple Shiftbosses in
the distributed unit tests.
Used multiple Shiftbosses in the distributed unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c4f4b285
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c4f4b285
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c4f4b285
Branch: refs/heads/quickstep_partition_parser_support
Commit: c4f4b285b63e8a3d6bb8fd312f6d00cc8f1cbd45
Parents: 4286d75
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Nov 12 22:31:42 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 27 11:46:23 2016 -0800
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 14 +++---
query_execution/PolicyEnforcerDistributed.cpp | 6 ++-
query_execution/QueryExecutionState.hpp | 53 ++++++++++++++++++++++
query_execution/QueryManagerDistributed.cpp | 35 ++++++++------
query_execution/QueryManagerDistributed.hpp | 4 +-
5 files changed, 88 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index d619657..7dccce4 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -238,17 +238,19 @@ void ForemanDistributed::run() {
bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
kCatalogRelationNewBlockMessage,
- kWorkOrderFeedbackMessage) &&
- // TODO(zuyu): Multiple Shiftbosses support.
- !shiftboss_directory_.hasReachedCapacity(0);
+ kWorkOrderFeedbackMessage);
}
void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
+ const size_t num_shiftbosses = shiftboss_directory_.size();
+ size_t shiftboss_index = 0u;
for (const auto &message : messages) {
DCHECK(message != nullptr);
- // TODO(zuyu): Multiple Shiftbosses support.
- sendWorkOrderMessage(0, *message);
- shiftboss_directory_.incrementNumQueuedWorkOrders(0);
+ sendWorkOrderMessage(shiftboss_index, *message);
+ shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index);
+
+ // TO(zuyu): Take data-locality into account for scheduling.
+ shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index c06fd86..6e09ea8 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -140,8 +140,10 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
- query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders);
- shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders);
+ const size_t shiftboss_index = proto.shiftboss_index();
+ query_manager->processInitiateRebuildResponseMessage(
+ proto.operator_index(), num_rebuild_work_orders, shiftboss_index);
+ shiftboss_directory_->addNumQueuedWorkOrders(shiftboss_index, num_rebuild_work_orders);
if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
onQueryCompletion(query_manager);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/QueryExecutionState.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionState.hpp b/query_execution/QueryExecutionState.hpp
index f5281d5..a4273dc 100644
--- a/query_execution/QueryExecutionState.hpp
+++ b/query_execution/QueryExecutionState.hpp
@@ -22,6 +22,12 @@
#include <cstddef>
#include <unordered_map>
+
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
+#ifdef QUICKSTEP_DISTRIBUTED
+#include <unordered_set>
+#endif // QUICKSTEP_DISTRIBUTED
+
#include <utility>
#include <vector>
@@ -103,6 +109,49 @@ class QueryExecutionState {
}
}
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Update the rebuild status of the given operator the number of
+ * pending rebuild work orders, after the rebuild has been initiated.
+ *
+ * @param operator_index The index of the given operator.
+ * @param num_rebuild_workorders The number of rebuild workorders of the given
+ * operator.
+ * @param shiftboss_index The index of the Shiftboss that rebuilt.
+ **/
+ void updateRebuildStatus(const std::size_t operator_index,
+ const std::size_t num_rebuild_workorders,
+ const std::size_t shiftboss_index) {
+ DCHECK_LT(operator_index, num_operators_);
+ auto search_res = rebuild_status_.find(operator_index);
+ DCHECK(search_res != rebuild_status_.end() && search_res->second.has_initiated);
+ search_res->second.num_pending_workorders += num_rebuild_workorders;
+ search_res->second.rebuilt_shiftboss_indexes.insert(shiftboss_index);
+ }
+
+ /**
+ * @brief Check if the rebuild has been finished for the given operator.
+ *
+ * @param operator_index The index of the given operator.
+ * @param num_shiftbosses The number of the Shiftbosses for rebuilt.
+ *
+ * @return True if the rebuild has been finished, false otherwise.
+ **/
+ inline bool hasRebuildFinished(const std::size_t operator_index,
+ const std::size_t num_shiftbosses) const {
+ DCHECK_LT(operator_index, num_operators_);
+ const auto search_res = rebuild_status_.find(operator_index);
+ DCHECK(search_res != rebuild_status_.end());
+
+ const auto &rebuild_status = search_res->second;
+ DCHECK(rebuild_status.has_initiated);
+
+ return rebuild_status.rebuilt_shiftboss_indexes.size() == num_shiftbosses &&
+ rebuild_status.num_pending_workorders == 0u;
+ }
+
+#endif // QUICKSTEP_DISTRIBUTED
+
/**
* @brief Check if the rebuild has been initiated for the given operator.
*
@@ -314,6 +363,10 @@ class QueryExecutionState {
// The number of pending rebuild workorders for the operator.
// Valid if and only if 'has_initiated' is true.
std::size_t num_pending_workorders;
+
+#ifdef QUICKSTEP_DISTRIBUTED
+ std::unordered_set<std::size_t> rebuilt_shiftboss_indexes;
+#endif // QUICKSTEP_DISTRIBUTED
};
// Key is dag_node_index for which rebuild is required.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 7d45933..20650d0 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -36,7 +36,9 @@
#include "glog/logging.h"
+#include "tmb/address.h"
#include "tmb/id_typedefs.h"
+#include "tmb/tagged_message.h"
using std::free;
using std::malloc;
@@ -125,16 +127,16 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index)
}
void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
- const std::size_t num_rebuild_work_orders) {
- // TODO(zuyu): Multiple Shiftbosses support.
- query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
+ const std::size_t num_rebuild_work_orders,
+ const std::size_t shiftboss_index) {
+ query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index);
- if (num_rebuild_work_orders != 0u) {
+ if (!query_exec_state_->hasRebuildFinished(op_index, shiftboss_directory_->size())) {
// Wait for the rebuild work orders to finish.
return;
}
- // No needs for rebuilds.
+ // No needs for rebuilds, or the rebuild has finished.
markOperatorFinished(op_index);
for (const std::pair<dag_node_index, bool> &dependent_link :
@@ -168,17 +170,20 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
kInitiateRebuildMessage);
free(proto_bytes);
+ // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+ tmb::Address shiftboss_addresses;
+ for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
+ shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+ }
+
LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
- << "') to Shiftboss";
- // TODO(zuyu): Multiple workers support.
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- shiftboss_directory_->getClientId(0),
- move(tagged_msg));
-
- // The negative value indicates that the number of rebuild work orders is to be
- // determined.
- query_exec_state_->setRebuildStatus(index, -1, true);
+ << "') to all Shiftbosses";
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+ shiftboss_addresses,
+ move(tagged_msg),
+ bus_);
+
+ query_exec_state_->setRebuildStatus(index, 0, true);
// Wait for Shiftbosses to report the number of rebuild work orders.
return false;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index e609ab8..f8ac53c 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -73,9 +73,11 @@ class QueryManagerDistributed final : public QueryManagerBase {
* for initiating the rebuild work order.
* @param num_rebuild_work_orders The number of the rebuild work orders
* generated for the operator indexed by 'op_index'.
+ * @param shiftboss_index The index of the Shiftboss that sends the message.
**/
void processInitiateRebuildResponseMessage(const dag_node_index op_index,
- const std::size_t num_rebuild_work_orders);
+ const std::size_t num_rebuild_work_orders,
+ const std::size_t shiftboss_index);
/**
* @brief Get the next normal workorder to be excuted, wrapped in a