You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/10 14:45:46 UTC
[25/31] incubator-quickstep git commit: Minor refactored
QueryManagerDistributed.
Minor refactored QueryManagerDistributed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8229994f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8229994f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8229994f
Branch: refs/heads/reorder-partitioned-hash-join
Commit: 8229994ff813b990830a48617d64362aa4c20c63
Parents: 1cfc1c4
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Feb 9 14:21:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 9 14:21:28 2017 -0800
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 1 -
query_execution/PolicyEnforcerDistributed.cpp | 12 ++++++++++--
query_execution/QueryManagerDistributed.cpp | 17 ++++++-----------
query_execution/QueryManagerDistributed.hpp | 15 ++++++++++-----
4 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index e26bde0..5ad6999 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -257,7 +257,6 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_QueryManagerBase
- quickstep_queryexecution_ShiftbossDirectory
quickstep_queryexecution_WorkOrderProtosContainer
quickstep_relationaloperators_RelationalOperator
quickstep_relationaloperators_WorkOrder_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index ef5abb0..12d2037 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -131,9 +131,17 @@ bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
// initializes.
initiateQueryInShiftboss(query_handle);
+ const std::size_t num_shiftbosses = shiftboss_directory_->size();
+
+ tmb::Address shiftboss_addresses;
+ for (std::size_t i = 0; i < num_shiftbosses; ++i) {
+ shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+ }
+
// Query with the same ID not present, ok to admit.
- admitted_queries_[query_id].reset(
- new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+ admitted_queries_.emplace(query_id,
+ std::make_unique<QueryManagerDistributed>(
+ query_handle, foreman_client_id_, num_shiftbosses, move(shiftboss_addresses), bus_));
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 6ac96ab..174c490 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -30,7 +30,6 @@
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/ShiftbossDirectory.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.pb.h"
@@ -52,12 +51,14 @@ using std::vector;
namespace quickstep {
QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
- const ShiftbossDirectory *shiftboss_directory,
const tmb::client_id foreman_client_id,
+ const std::size_t num_shiftbosses,
+ tmb::Address &&shiftboss_addresses,
tmb::MessageBus *bus)
: QueryManagerBase(query_handle),
- shiftboss_directory_(shiftboss_directory),
foreman_client_id_(foreman_client_id),
+ num_shiftbosses_(num_shiftbosses),
+ shiftboss_addresses_(move(shiftboss_addresses)),
bus_(bus),
normal_workorder_protos_container_(
new WorkOrderProtosContainer(num_operators_in_dag_)) {
@@ -142,7 +143,7 @@ void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_no
const std::size_t shiftboss_index) {
query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index);
- if (!query_exec_state_->hasRebuildFinished(op_index, shiftboss_directory_->size())) {
+ if (!query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) {
// Wait for the rebuild work orders to finish.
return;
}
@@ -181,16 +182,10 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
kInitiateRebuildMessage);
free(proto_bytes);
- // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
- tmb::Address shiftboss_addresses;
- for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
- shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
- }
-
DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
<< "') to all Shiftbosses";
QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
- shiftboss_addresses,
+ shiftboss_addresses_,
move(tagged_msg),
bus_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 631b15a..759fa70 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -32,6 +32,7 @@
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "utility/Macros.hpp"
+#include "tmb/address.h"
#include "tmb/id_typedefs.h"
namespace tmb { class MessageBus; }
@@ -39,7 +40,6 @@ namespace tmb { class MessageBus; }
namespace quickstep {
class QueryHandle;
-class ShiftbossDirectory;
namespace serialization { class WorkOrderMessage; }
@@ -57,13 +57,15 @@ class QueryManagerDistributed final : public QueryManagerBase {
* @brief Constructor.
*
* @param query_handle The QueryHandle object for this query.
- * @param shiftboss_directory The ShiftbossDirectory to use.
* @param foreman_client_id The TMB client ID of the foreman thread.
+ * @param num_shiftbosses The number of Shiftbosses for rebuild.
+ * @param shiftboss_addresses The TMB Address of Shiftbosses for rebuild.
* @param bus The TMB used for communication.
**/
QueryManagerDistributed(QueryHandle *query_handle,
- const ShiftbossDirectory *shiftboss_directory,
const tmb::client_id foreman_client_id,
+ const std::size_t num_shiftbosses,
+ tmb::Address &&shiftboss_addresses,
tmb::MessageBus *bus);
~QueryManagerDistributed() override {}
@@ -153,9 +155,12 @@ class QueryManagerDistributed final : public QueryManagerBase {
(query_exec_state_->getNumRebuildWorkOrders(index) == 0);
}
- const ShiftbossDirectory *shiftboss_directory_;
-
const tmb::client_id foreman_client_id_;
+
+ // TODO(quickstep-team): deal with Shiftboss failure.
+ const std::size_t num_shiftbosses_;
+ const tmb::Address shiftboss_addresses_;
+
tmb::MessageBus *bus_;
std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;