You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/07 17:14:51 UTC

incubator-quickstep git commit: Bool flag to toggle between static and dynamic

Repository: incubator-quickstep
Updated Branches:
  refs/heads/scheduler++ bc4d2ced9 -> 475da429e


Bool flag to toggle between static and dynamic


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/475da429
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/475da429
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/475da429

Branch: refs/heads/scheduler++
Commit: 475da429ee31e9a7e4f84b4c5e2aa2dbcff132f2
Parents: bc4d2ce
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jul 7 12:14:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jul 7 12:14:25 2016 -0500

----------------------------------------------------------------------
 query_execution/Foreman.cpp                |  2 +-
 query_execution/Learner.cpp                |  2 +-
 query_execution/Learner.hpp                | 60 ++++++++++++++++++-------
 query_execution/PriorityPolicyEnforcer.cpp | 28 +++++++++++-
 query_execution/PriorityPolicyEnforcer.hpp | 11 +----
 5 files changed, 73 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index a5e8894..40d6e5c 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -46,7 +46,7 @@ using std::vector;
 
 namespace quickstep {
 
-DEFINE_uint64(min_load_per_worker, 1, "The minimum load defined as the number "
+DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
               "of pending work orders for the worker. This information is used "
               "by the Foreman to assign work orders to worker threads");
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 56e9626..ed5b394 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -52,13 +52,13 @@ void Learner::addCompletionFeedback(
         &workorder_completion_proto) {
   const std::size_t query_id = workorder_completion_proto.query_id();
   DCHECK(isQueryPresent(query_id));
-  const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
   ExecutionStats *execution_stats = getExecutionStats(query_id);
   DCHECK(execution_stats != nullptr);
   execution_stats->addEntry(
       workorder_completion_proto.execution_time_in_microseconds(),
       workorder_completion_proto.operator_index());
 
+  const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
   if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
     updateFeedbackFromQueriesInPriorityLevel(priority_level);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 8654544..03c692b 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -40,7 +40,6 @@ namespace quickstep {
 /** \addtogroup QueryExecution
  *  @{
  */
-
 class Learner {
  public:
   /**
@@ -151,6 +150,8 @@ class Learner {
    *
    * @note We use uniform random distribution.
    *
+   * TODO(harshad) - Use a default_probabilities_of_priority_levels data structure.
+   *
    * @return A priority level. If no queries are present in the learner, return
    *         kInvalidPriorityLevel.
    **/
@@ -177,7 +178,8 @@ class Learner {
       const int random_priority_level = pickRandomPriorityLevel();
       // Note : The valid priority level values are non-zero.
       DCHECK_GT(random_priority_level, 0);
-      const int result = pickRandomQueryFromPriorityLevel(
+      // TODO(harshad) - Make this function a template too.
+      const int result = pickRandomQueryFromPriorityLevel<true>(
           static_cast<std::size_t>(random_priority_level));
       return result;
     } else {
@@ -193,25 +195,51 @@ class Learner {
    * @return A query ID. If no queries are present for this priority level in
    *         the learner, return kInvalidQueryID.
    **/
+  template <bool dynamic_probabilities>
   inline const int pickRandomQueryFromPriorityLevel(
       const std::size_t priority_level) const {
     DCHECK(isPriorityLevelPresent(priority_level));
-    if (hasActiveQueries()) {
-      if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
-        DCHECK(current_probabilities_.at(priority_level) != nullptr);
-        const auto it = current_probabilities_.find(priority_level);
-        if (it->second->getNumObjects() > 0) {
-          return static_cast<int>(
-              current_probabilities_.at(priority_level)->pickRandomProperty());
-        }
-      } else {
-        DCHECK(default_probabilities_.at(priority_level) != nullptr);
-        const auto it = default_probabilities_.find(priority_level);
-        if (it->second->getNumObjects() > 0) {
-          return static_cast<int>(
-              default_probabilities_.at(priority_level)->pickRandomProperty());
+    if (dynamic_probabilities) {
+      if (hasActiveQueries()) {
+        if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+          DCHECK(current_probabilities_.at(priority_level) != nullptr);
+          const auto it = current_probabilities_.find(priority_level);
+          if (it->second->getNumObjects() > 0) {
+            return static_cast<int>(
+                current_probabilities_.at(priority_level)->pickRandomProperty());
+          }
+        } else {
+          DCHECK(default_probabilities_.at(priority_level) != nullptr);
+          const auto it = default_probabilities_.find(priority_level);
+          if (it->second->getNumObjects() > 0) {
+            return static_cast<int>(
+                default_probabilities_.at(priority_level)->pickRandomProperty());
+          }
         }
       }
+      return kInvalidQueryID;
+    } else {
+      return pickRandomQueryFromPriorityLevelDefaultProbs(priority_level);
+    }
+  }
+
+  /**
+   * @brief Randomly pick a query from a given priority level using the default
+   *        probabilities.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A query ID. If no queries are present for this priority level in
+   *         the learner, return kInvalidQueryID.
+   **/
+  inline const int pickRandomQueryFromPriorityLevelDefaultProbs(const std::size_t priority_level) const {
+    if (hasActiveQueries()) {
+      DCHECK(default_probabilities_.at(priority_level) != nullptr);
+      const auto it = default_probabilities_.find(priority_level);
+      if (it->second->getNumObjects() > 0) {
+        return static_cast<int>(
+            default_probabilities_.at(priority_level)->pickRandomProperty());
+      }
     }
     return kInvalidQueryID;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 7d0d46f..36e2624 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -38,10 +38,29 @@
 
 namespace quickstep {
 
-DEFINE_uint64(max_msgs_per_dispatch_round, 80, "Maximum number of messages that"
+DEFINE_uint64(max_msgs_per_dispatch_round, 40, "Maximum number of messages that"
               " can be allocated in a single round of dispatch of messages to"
               " the workers.");
 
+DEFINE_bool(dynamic_probabilities_in_learner, true, "Whether the learner should have dynamic probabilities or static probabilities");
+
+PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_client_id,
+                       const std::size_t num_numa_nodes,
+                       CatalogDatabaseLite *catalog_database,
+                       StorageManager *storage_manager,
+                       WorkerDirectory *worker_directory,
+                       tmb::MessageBus *bus,
+                       const bool profile_individual_workorders)
+    : foreman_client_id_(foreman_client_id),
+      num_numa_nodes_(num_numa_nodes),
+      catalog_database_(catalog_database),
+      storage_manager_(storage_manager),
+      worker_directory_(worker_directory),
+      bus_(bus),
+      profile_individual_workorders_(profile_individual_workorders) {
+  learner_.reset(new Learner());
+}
+
 bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
   if (admitted_queries_.size() < kMaxConcurrentQueries) {
     // Ok to admit the query.
@@ -253,7 +272,12 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
   std::unordered_map<std::size_t, bool> checked_query_ids;
   // While there are more queries to be checked ..
   while (checked_query_ids.size() < priority_query_ids_[priority_level].size()) {
-    const int chosen_query_id = learner_->pickRandomQueryFromPriorityLevel(priority_level);
+    int chosen_query_id;
+    if (FLAGS_dynamic_probabilities_in_learner) {
+      chosen_query_id = learner_->pickRandomQueryFromPriorityLevel<true>(priority_level);
+    } else {
+      chosen_query_id = learner_->pickRandomQueryFromPriorityLevel<false>(priority_level);
+    }
     if (chosen_query_id == kInvalidQueryID) {
       // No query available at this time in this priority level.
       return nullptr;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 1f13a10..b20d831 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -65,16 +65,7 @@ class PriorityPolicyEnforcer {
                          StorageManager *storage_manager,
                          WorkerDirectory *worker_directory,
                          tmb::MessageBus *bus,
-                         const bool profile_individual_workorders = false)
-      : foreman_client_id_(foreman_client_id),
-        num_numa_nodes_(num_numa_nodes),
-        catalog_database_(catalog_database),
-        storage_manager_(storage_manager),
-        worker_directory_(worker_directory),
-        bus_(bus),
-        profile_individual_workorders_(profile_individual_workorders) {
-    learner_.reset(new Learner());
-  }
+                         const bool profile_individual_workorders = false);
 
   /**
    * @brief Destructor.