You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by na...@apache.org on 2016/06/15 02:54:15 UTC
incubator-quickstep git commit: BugFix: Update NumQueuedWorkOrders to
fix scheduling
Repository: incubator-quickstep
Updated Branches:
refs/heads/fix_scheduler [created] 74e49fa7f
BugFix: Update NumQueuedWorkOrders to fix scheduling
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/74e49fa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/74e49fa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/74e49fa7
Branch: refs/heads/fix_scheduler
Commit: 74e49fa7f91ab33e20d488ef9923b285214bc04e
Parents: 8230b12
Author: Navneet Potti <na...@apache.org>
Authored: Tue Jun 14 21:52:25 2016 -0500
Committer: Navneet Potti <na...@apache.org>
Committed: Tue Jun 14 21:52:25 2016 -0500
----------------------------------------------------------------------
query_execution/Foreman.cpp | 10 +++++++---
query_execution/PolicyEnforcer.cpp | 2 ++
query_execution/PolicyEnforcer.hpp | 9 +++++++--
3 files changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/74e49fa7/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 0577e20..c320272 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -89,7 +89,8 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
num_numa_nodes,
catalog_database_,
storage_manager_,
- bus_));
+ bus_,
+ worker_directory_));
}
void Foreman::run() {
@@ -115,6 +116,7 @@ void Foreman::run() {
policy_enforcer_->processMessage(tagged_message);
break;
}
+
case kAdmitRequestMessage: {
const AdmitRequestMessage *msg =
static_cast<const AdmitRequestMessage *>(tagged_message.message());
@@ -195,9 +197,11 @@ void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &me
if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
*message);
+ worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
} else {
- sendWorkerMessage(worker_directory_->getLeastLoadedWorker().first,
- *message);
+ const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
+ sendWorkerMessage(least_loaded_worker_thread_index, *message);
+ worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/74e49fa7/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 2145429..4501026 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -27,6 +27,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerDirectory.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -72,6 +73,7 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
+ worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index());
break;
}
case kCatalogRelationNewBlockMessage: {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/74e49fa7/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 5915b79..f79c4ee 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -40,6 +40,7 @@ namespace quickstep {
class CatalogDatabaseLite;
class QueryHandle;
class StorageManager;
+class WorkerDirectory;
/**
* @brief A class that ensures that a high level policy is maintained
@@ -60,12 +61,14 @@ class PolicyEnforcer {
const std::size_t num_numa_nodes,
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
- tmb::MessageBus *bus)
+ tmb::MessageBus *bus,
+ WorkerDirectory *worker_directory)
: foreman_client_id_(foreman_client_id),
num_numa_nodes_(num_numa_nodes),
catalog_database_(catalog_database),
storage_manager_(storage_manager),
- bus_(bus) {}
+ bus_(bus),
+ worker_directory_(worker_directory) {}
/**
* @brief Destructor.
@@ -151,6 +154,8 @@ class PolicyEnforcer {
tmb::MessageBus *bus_;
+ WorkerDirectory *worker_directory_;
+
// Key = query ID, value = QueryManager* for the key query.
std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;