You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/29 21:06:45 UTC

[16/19] incubator-quickstep git commit: Select query from learner in PolicyEnforcer.

Select query from learner in PolicyEnforcer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a70b1eca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a70b1eca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a70b1eca

Branch: refs/heads/scheduler++
Commit: a70b1ecadc5daa1ddc9a1302f6ea4874a2b91dbe
Parents: ed93b48
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 12:25:02 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jun 29 16:06:07 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.cpp                |  5 +-
 query_execution/Learner.hpp                | 31 +++++----
 query_execution/PriorityPolicyEnforcer.cpp | 86 +++++++++++++++++++++----
 query_execution/PriorityPolicyEnforcer.hpp | 19 ++++++
 4 files changed, 112 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a70b1eca/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 11c3735..bb24baa 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -49,7 +49,7 @@ void Learner::addCompletionFeedback(
         &workorder_completion_proto) {
   const std::size_t query_id = workorder_completion_proto.query_id();
   DCHECK(isQueryPresent(query_id));
-  const std::size_t priority_level = getQueryPriority(query_id);
+  const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
   ExecutionStats *execution_stats = getExecutionStats(query_id);
   DCHECK(execution_stats != nullptr);
   execution_stats->addEntry(
@@ -106,12 +106,10 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
         query_id,
         1 / static_cast<float>(mean_workorders_per_query[query_id]),
         denominator);
-    // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
   } else {
     // At least one of the queries has predicted time for next work order as 0.
     // In such a case, we don't update the probabilities and continue to use
     // the older probabilities.
-    // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
     return;
   }
 }
@@ -248,7 +246,6 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
     probabilities_of_priority_levels_->removeObject(priority_level);
     // NOTE(harshad) : Not using this cache as it gets confusing.
     // has_feedback_from_all_queries_.erase(priority_level);
-    // LOG(INFO) << "Removed priority level: " << priority_level;
     if (hasActiveQueries()) {
       if (static_cast<int>(priority_level) == highest_priority_level_) {
         // The priority level to be removed is the highest priority level.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a70b1eca/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 2c6fdef..8634842 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -70,7 +70,7 @@ class Learner {
    **/
   void removeQuery(const std::size_t query_id) {
     DCHECK(isQueryPresent(query_id));
-    const std::size_t priority_level = getQueryPriority(query_id);
+    const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
     // Find the iterator to the query in execution_stats_.
     auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
     execution_stats_[priority_level].erase(stats_iter_mutable);
@@ -158,9 +158,6 @@ class Learner {
     if (hasActiveQueries()) {
       const int result = static_cast<int>(
           probabilities_of_priority_levels_->pickRandomProperty());
-      /*LOG(INFO) << "Random priority level: " << result << " has "
-                << current_probabilities_.find(result)->second->getNumObjects()
-                << " queries";*/
       return result;
     } else {
       return kInvalidPriorityLevel;
@@ -182,10 +179,8 @@ class Learner {
       DCHECK_GT(random_priority_level, 0);
       const int result = pickRandomQueryFromPriorityLevel(
           static_cast<std::size_t>(random_priority_level));
-      // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
       return result;
     } else {
-      // LOG(INFO) << "No active query right now";
       return kInvalidQueryID;
     }
   }
@@ -209,7 +204,6 @@ class Learner {
           return static_cast<int>(
               current_probabilities_.at(priority_level)->pickRandomProperty());
         }
-        // LOG(INFO) << "No queries in priority level: " << priority_level;
       } else {
         DCHECK(default_probabilities_.at(priority_level) != nullptr);
         const auto it = default_probabilities_.find(priority_level);
@@ -217,12 +211,24 @@ class Learner {
           return static_cast<int>(
               default_probabilities_.at(priority_level)->pickRandomProperty());
         }
-        // LOG(INFO) << "No queries in priority level: " << priority_level;
       }
     }
     return kInvalidQueryID;
   }
 
+  /**
+   * @brief Given a query ID, if the query exists in the learner, return its
+   *        priority, otherwise return kInvalidPriorityLevel.
+   **/
+  inline int getQueryPriority(const std::size_t query_id) const {
+    const auto it = query_id_to_priority_lookup_.find(query_id);
+    if (it != query_id_to_priority_lookup_.end()) {
+      return it->second;
+    } else {
+      return kInvalidPriorityLevel;
+    }
+  }
+
  private:
   /**
    * @brief Update the probabilities for queries in the given priority level.
@@ -324,7 +330,7 @@ class Learner {
     if (isQueryPresent(query_id)) {
       const auto stats_iter = getExecutionStatsIterMutable(query_id);
       DCHECK(stats_iter !=
-             std::end(execution_stats_[getQueryPriority(query_id)]));
+             std::end(execution_stats_[getQueryPriorityUnsafe(query_id)]));
       return stats_iter->second.get();
     }
     return nullptr;
@@ -339,7 +345,7 @@ class Learner {
   inline std::vector<
       std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
       getExecutionStatsIterMutable(const std::size_t query_id) {
-    const std::size_t priority_level = getQueryPriority(query_id);
+    const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
         &stats_vector = execution_stats_[priority_level];
     // The following line uses std::find_if to reach to the desired element
@@ -356,8 +362,11 @@ class Learner {
 
   /**
    * @brief Get a query's priority level given its ID.
+   *
+   * @note This version assumes that the given query ID exists in data
+   *       structures.
    **/
-  inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+  inline const std::size_t getQueryPriorityUnsafe(const std::size_t query_id) const {
     const auto it = query_id_to_priority_lookup_.find(query_id);
     DCHECK(it != query_id_to_priority_lookup_.end());
     return it->second;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a70b1eca/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 44ccb0a..6467367 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -53,6 +53,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
           new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
                            catalog_database_, storage_manager_, bus_));
       LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+      priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
       learner_->addQuery(*query_handle);
       return true;
     } else {
@@ -71,6 +72,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
   // QueryManager, so that we need to extract message from the
   // TaggedMessage only once.
   std::size_t query_id;
+  std::size_t operator_id;
   switch (tagged_message.message_type()) {
     case kWorkOrderCompleteMessage: {
       serialization::NormalWorkOrderCompletionMessage proto;
@@ -79,6 +81,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();
+      operator_id = proto.operator_index();
       worker_directory_->decrementNumQueuedWorkOrders(
           proto.worker_thread_index());
       learner_->addCompletionFeedback(proto);
@@ -94,6 +97,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();
+      operator_id = proto.operator_index();
       worker_directory_->decrementNumQueuedWorkOrders(
           proto.worker_thread_index());
       break;
@@ -132,6 +136,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   const QueryManager::QueryStatusCode return_code =
       admitted_queries_[query_id]->processMessage(tagged_message);
+  //NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
   if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
     removeQuery(query_id);
     if (!waiting_queries_.empty()) {
@@ -140,6 +145,8 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
       waiting_queries_.pop();
       admitQuery(new_query);
     }
+  } else if (return_code == QueryManager::QueryStatusCode::kOperatorExecuted) {
+    learner_->removeOperator(query_id, operator_id);
   }
 }
 
@@ -161,26 +168,25 @@ void PriorityPolicyEnforcer::getWorkerMessages(
   DCHECK_GT(per_query_share, 0u);
   std::vector<std::size_t> finished_queries_ids;
 
-  for (const auto &admitted_query_info : admitted_queries_) {
-    QueryManager *curr_query_manager = admitted_query_info.second.get();
-    DCHECK(curr_query_manager != nullptr);
-    std::size_t messages_collected_curr_query = 0;
-    while (messages_collected_curr_query < per_query_share) {
+  if (learner_->hasActiveQueries()) {
+    // Key = priority level. Value = Whether we have already checked the
+    std::unordered_map<std::size_t, bool> checked_priority_levels;
+    // While there are more priority levels to be checked ..
+    while (checked_priority_levels.size() < priority_query_ids_.size()) {
+      const int chosen_priority_level = learner_->pickRandomPriorityLevel();
+      DCHECK(chosen_priority_level != kInvalidPriorityLevel);
       WorkerMessage *next_worker_message =
-          curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+          getNextWorkerMessageFromPriorityLevel(chosen_priority_level,
+                                                &finished_queries_ids);
       if (next_worker_message != nullptr) {
-        ++messages_collected_curr_query;
         worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
       } else {
-        // No more work ordes from the current query at this time.
-        // Check if the query's execution is over.
-        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-          // If the query has been executed, remove it.
-          finished_queries_ids.push_back(admitted_query_info.first);
-        }
-        break;
+        checked_priority_levels[static_cast<std::size_t>(chosen_priority_level)] = true;
       }
     }
+  } else {
+    LOG(INFO) << "No active queries in the learner at this point.";
+    return;
   }
   for (const std::size_t finished_qid : finished_queries_ids) {
     removeQuery(finished_qid);
@@ -194,6 +200,22 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
                  << " that hasn't finished its execution";
   }
   admitted_queries_.erase(query_id);
+  // Remove the query from priority_query_ids_ structure.
+  const int query_priority = learner_->getQueryPriority(query_id);
+  DCHECK(query_priority != kInvalidPriorityLevel);
+  const std::size_t query_priority_unsigned =
+      static_cast<std::size_t>(query_priority);
+  std::vector<std::size_t> *query_ids_for_priority_level =
+      &priority_query_ids_[query_priority_unsigned];
+  query_ids_for_priority_level->erase(
+      std::remove(query_ids_for_priority_level->begin(),
+                  query_ids_for_priority_level->end(),
+                  query_id));
+  if (query_ids_for_priority_level->empty()) {
+    // No more queries for the given priority level. Remove the entry.
+    priority_query_ids_.erase(query_priority_unsigned);
+  }
+  // Remove the query from the learner.
   learner_->removeQuery(query_id);
 }
 
@@ -219,4 +241,40 @@ void PriorityPolicyEnforcer::recordTimeForWorkOrder(
       proto.execution_time_in_microseconds());
 }
 
+WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
+    const std::size_t priority_level,
+    std::vector<std::size_t> *finished_queries_ids) {
+  // Key = query ID from the given priority level, value = whether we have
+  // checked this query earlier.
+  std::unordered_map<std::size_t, bool> checked_query_ids;
+  // While there are more queries to be checked ..
+  while (checked_query_ids.size() < priority_query_ids_[priority_level].size()) {
+    const int chosen_query_id = learner_->pickRandomQueryFromPriorityLevel(priority_level);
+    if (chosen_query_id == kInvalidQueryID) {
+      // No query available at this time in this priority level.
+      return nullptr;
+    } else if (checked_query_ids.find(static_cast<std::size_t>(chosen_query_id)) != checked_query_ids.end()) {
+      // We have already seen this query ID, try one more time.
+      LOG(INFO) << "We have already seen this query, continue";
+      continue;
+    } else {
+      // We haven't seen this query earlier. Check if it has any schedulable
+      // WorkOrder.
+      QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(chosen_query_id)].get();
+      DCHECK(chosen_query_manager != nullptr);
+      std::unique_ptr<WorkerMessage> next_worker_message(chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
+      if (next_worker_message != nullptr) {
+        return next_worker_message.release();
+      } else {
+        // This query doesn't have any WorkerMessage right now. Mark as checked.
+        checked_query_ids[chosen_query_id] = true;
+        if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          finished_queries_ids->emplace_back(static_cast<std::size_t>(chosen_query_id));
+        }
+      }
+    }
+  }
+  return nullptr;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a70b1eca/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 94cbe38..281c066 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -183,6 +183,21 @@ class PriorityPolicyEnforcer {
   void recordTimeForWorkOrder(
       const serialization::NormalWorkOrderCompletionMessage &proto);
 
+  /**
+   * @brief get a WorkerMessage from the chosen priority level.
+   *
+   * @param priority_level The priority level from which the query will be
+   *        chosen.
+   * @param finished_query_ids A vector of query IDs that have finished their
+   *        execution.
+   *
+   * @return A WorkerMessage. If no query can be chosen from this priority level,
+   *         return NULL.
+   **/
+  WorkerMessage *getNextWorkerMessageFromPriorityLevel(
+      const std::size_t priority_level,
+      std::vector<std::size_t> *finished_queries_ids);
+
   const tmb::client_id foreman_client_id_;
   const std::size_t num_numa_nodes_;
 
@@ -193,6 +208,10 @@ class PriorityPolicyEnforcer {
   tmb::MessageBus *bus_;
   const bool profile_individual_workorders_;
 
+  // Key = priority level, value = a vector of IDs of the queries belonging to
+  // the key priority level.
+  std::unordered_map<std::size_t, std::vector<std::size_t>> priority_query_ids_;
+
   // Key = query ID, value = QueryManager* for the key query.
   std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;