You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ha...@apache.org on 2016/06/15 19:48:50 UTC

[47/50] [abbrv] incubator-quickstep git commit: BugFix: Update NumQueuedWorkOrders to fix scheduling

BugFix: Update NumQueuedWorkOrders to fix scheduling


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/49316237
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/49316237
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/49316237

Branch: refs/heads/decimal-type
Commit: 49316237f54cfff6e2ea92fe3c4333f35d46190f
Parents: dd44958
Author: Navneet Potti <na...@apache.org>
Authored: Tue Jun 14 21:52:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jun 15 00:20:34 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt     | 1 +
 query_execution/Foreman.cpp        | 8 ++++++--
 query_execution/PolicyEnforcer.cpp | 2 ++
 query_execution/PolicyEnforcer.hpp | 4 ++++
 4 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 323e4a9..501166e 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -93,6 +93,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryManager
+                      quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_relationaloperators_WorkOrder

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 0577e20..828834d 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -89,6 +89,7 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
       num_numa_nodes,
       catalog_database_,
       storage_manager_,
+      worker_directory_,
       bus_));
 }
 
@@ -115,6 +116,7 @@ void Foreman::run() {
         policy_enforcer_->processMessage(tagged_message);
         break;
       }
+
       case kAdmitRequestMessage: {
         const AdmitRequestMessage *msg =
             static_cast<const AdmitRequestMessage *>(tagged_message.message());
@@ -195,9 +197,11 @@ void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &me
     if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) {
       sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index),
                         *message);
+      worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index);
     } else {
-      sendWorkerMessage(worker_directory_->getLeastLoadedWorker().first,
-                        *message);
+      const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first;
+      sendWorkerMessage(least_loaded_worker_thread_index, *message);
+      worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 2145429..4501026 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -27,6 +27,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerDirectory.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "relational_operators/WorkOrder.hpp"
 
@@ -72,6 +73,7 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();
+      worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index());
       break;
     }
     case kCatalogRelationNewBlockMessage: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 5915b79..9f87056 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -40,6 +40,7 @@ namespace quickstep {
 class CatalogDatabaseLite;
 class QueryHandle;
 class StorageManager;
+class WorkerDirectory;
 
 /**
  * @brief A class that ensures that a high level policy is maintained
@@ -60,11 +61,13 @@ class PolicyEnforcer {
                  const std::size_t num_numa_nodes,
                  CatalogDatabaseLite *catalog_database,
                  StorageManager *storage_manager,
+                 WorkerDirectory *worker_directory,
                  tmb::MessageBus *bus)
       : foreman_client_id_(foreman_client_id),
         num_numa_nodes_(num_numa_nodes),
         catalog_database_(catalog_database),
         storage_manager_(storage_manager),
+        worker_directory_(worker_directory),
         bus_(bus) {}
 
   /**
@@ -148,6 +151,7 @@ class PolicyEnforcer {
 
   CatalogDatabaseLite *catalog_database_;
   StorageManager *storage_manager_;
+  WorkerDirectory *worker_directory_;
 
   tmb::MessageBus *bus_;