You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ha...@apache.org on 2016/06/15 19:48:50 UTC
[47/50] [abbrv] incubator-quickstep git commit: BugFix: Update
NumQueuedWorkOrders to fix scheduling
BugFix: Update NumQueuedWorkOrders to fix scheduling
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/49316237
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/49316237
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/49316237
Branch: refs/heads/decimal-type
Commit: 49316237f54cfff6e2ea92fe3c4333f35d46190f
Parents: dd44958
Author: Navneet Potti <na...@apache.org>
Authored: Tue Jun 14 21:52:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jun 15 00:20:34 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 1 +
query_execution/Foreman.cpp | 8 ++++++--
query_execution/PolicyEnforcer.cpp | 2 ++
query_execution/PolicyEnforcer.hpp | 4 ++++
4 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 323e4a9..501166e 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -93,6 +93,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryManager
+ quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
quickstep_queryoptimizer_QueryHandle
quickstep_relationaloperators_WorkOrder
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 0577e20..828834d 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -89,6 +89,7 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
num_numa_nodes,
catalog_database_,
storage_manager_,
+ worker_directory_,
bus_));
}
@@ -115,6 +116,7 @@ void Foreman::run() {
policy_enforcer_->processMessage(tagged_message);
break;
}
+
case kAdmitRequestMessage: {
const AdmitRequestMessage *msg =
static_cast<const AdmitRequestMessage *>(tagged_message.message());
@@ -195,9 +197,11 @@ void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &me
if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
*message);
+ worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
} else {
- sendWorkerMessage(worker_directory_->getLeastLoadedWorker().first,
- *message);
+ const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
+ sendWorkerMessage(least_loaded_worker_thread_index, *message);
+ worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 2145429..4501026 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -27,6 +27,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerDirectory.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -72,6 +73,7 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
+ worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index());
break;
}
case kCatalogRelationNewBlockMessage: {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 5915b79..9f87056 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -40,6 +40,7 @@ namespace quickstep {
class CatalogDatabaseLite;
class QueryHandle;
class StorageManager;
+class WorkerDirectory;
/**
* @brief A class that ensures that a high level policy is maintained
@@ -60,11 +61,13 @@ class PolicyEnforcer {
const std::size_t num_numa_nodes,
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
+ WorkerDirectory *worker_directory,
tmb::MessageBus *bus)
: foreman_client_id_(foreman_client_id),
num_numa_nodes_(num_numa_nodes),
catalog_database_(catalog_database),
storage_manager_(storage_manager),
+ worker_directory_(worker_directory),
bus_(bus) {}
/**
@@ -148,6 +151,7 @@ class PolicyEnforcer {
CatalogDatabaseLite *catalog_database_;
StorageManager *storage_manager_;
+ WorkerDirectory *worker_directory_;
tmb::MessageBus *bus_;