You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by na...@apache.org on 2016/06/15 02:54:15 UTC

incubator-quickstep git commit: BugFix: Update NumQueuedWorkOrders to fix scheduling

Repository: incubator-quickstep
Updated Branches:
  refs/heads/fix_scheduler [created] 74e49fa7f


BugFix: Update NumQueuedWorkOrders to fix scheduling


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/74e49fa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/74e49fa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/74e49fa7

Branch: refs/heads/fix_scheduler
Commit: 74e49fa7f91ab33e20d488ef9923b285214bc04e
Parents: 8230b12
Author: Navneet Potti <na...@apache.org>
Authored: Tue Jun 14 21:52:25 2016 -0500
Committer: Navneet Potti <na...@apache.org>
Committed: Tue Jun 14 21:52:25 2016 -0500

----------------------------------------------------------------------
 query_execution/Foreman.cpp        | 10 +++++++---
 query_execution/PolicyEnforcer.cpp |  2 ++
 query_execution/PolicyEnforcer.hpp |  9 +++++++--
 3 files changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/74e49fa7/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 0577e20..c320272 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -89,7 +89,8 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
       num_numa_nodes,
       catalog_database_,
       storage_manager_,
-      bus_));
+      bus_,
+      worker_directory_));
 }
 
 void Foreman::run() {
@@ -115,6 +116,7 @@ void Foreman::run() {
         policy_enforcer_->processMessage(tagged_message);
         break;
       }
+
       case kAdmitRequestMessage: {
         const AdmitRequestMessage *msg =
             static_cast<const AdmitRequestMessage *>(tagged_message.message());
@@ -195,9 +197,11 @@ void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &me
     if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
       sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
                         *message);
+      worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
     } else {
-      sendWorkerMessage(worker_directory_->getLeastLoadedWorker().first,
-                        *message);
+      const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
+      sendWorkerMessage(least_loaded_worker_thread_index, *message);
+      worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/74e49fa7/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 2145429..4501026 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -27,6 +27,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerDirectory.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "relational_operators/WorkOrder.hpp"
 
@@ -72,6 +73,7 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();
+      worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index());
       break;
     }
     case kCatalogRelationNewBlockMessage: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/74e49fa7/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 5915b79..f79c4ee 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -40,6 +40,7 @@ namespace quickstep {
 class CatalogDatabaseLite;
 class QueryHandle;
 class StorageManager;
+class WorkerDirectory;
 
 /**
  * @brief A class that ensures that a high level policy is maintained
@@ -60,12 +61,14 @@ class PolicyEnforcer {
                  const std::size_t num_numa_nodes,
                  CatalogDatabaseLite *catalog_database,
                  StorageManager *storage_manager,
-                 tmb::MessageBus *bus)
+                 tmb::MessageBus *bus,
+                 WorkerDirectory *worker_directory)
       : foreman_client_id_(foreman_client_id),
         num_numa_nodes_(num_numa_nodes),
         catalog_database_(catalog_database),
         storage_manager_(storage_manager),
-        bus_(bus) {}
+        bus_(bus),
+        worker_directory_(worker_directory) {}
 
   /**
    * @brief Destructor.
@@ -151,6 +154,8 @@ class PolicyEnforcer {
 
   tmb::MessageBus *bus_;
 
+  WorkerDirectory *worker_directory_;
+
   // Key = query ID, value = QueryManager* for the key query.
   std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;