You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/01 04:41:35 UTC

[05/18] incubator-quickstep git commit: Fixed getDenominator method. More unit tests.

Fixed getDenominator method. More unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/daab9ece
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/daab9ece
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/daab9ece

Branch: refs/heads/scheduler++
Commit: daab9ecea6bf1fa06fa34776c11098d55a82e07c
Parents: d2a6073
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sat Jun 25 09:35:10 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 23:41:05 2016 -0500

----------------------------------------------------------------------
 query_execution/ExecutionStats.hpp         |  15 +-
 query_execution/Learner.cpp                |  46 +++--
 query_execution/Learner.hpp                | 147 ++++++++++++--
 query_execution/ProbabilityStore.hpp       |  15 +-
 query_execution/QueryExecutionTypedefs.hpp |   1 +
 query_execution/tests/Learner_unittest.cpp | 259 +++++++++++++++++++++++-
 6 files changed, 440 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/daab9ece/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index 769c7a4..5643749 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,17 +58,17 @@ class ExecutionStats {
   }
 
   /**
-   * @brief Check if there are any stats present.
+   * @brief Check if there are stats present for at least one active operator.
    **/
   inline bool hasStats() const {
     for (auto it = active_operators_.begin();
          it != active_operators_.end();
          ++it) {
-      if (!it->second->hasStatsForOperator()) {
-        return false;
+      if (it->second->hasStatsForOperator()) {
+        return true;
       }
     }
-    return true;
+    return false;
   }
 
   /**
@@ -109,14 +109,13 @@ class ExecutionStats {
    * @param operator_index The operator index which the value belongs to.
    **/
   void addEntry(std::size_t value, std::size_t operator_index) {
-    if (hasOperator(operator_index)) {
-      // This is not the first entry for the given operator.
-      active_operators_[operator_index]->addEntry(value);
-    } else {
+    if (!hasOperator(operator_index)) {
+      // This is the first entry for the given operator.
       // Create the OperatorStats object for this operator.
       active_operators_[operator_index] =
           std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
     }
+    active_operators_[operator_index]->addEntry(value);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/daab9ece/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 720df33..11c3735 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,7 +26,6 @@
 #include "query_execution/ExecutionStats.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
@@ -76,10 +75,20 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     DCHECK(current_probabilities_[priority_level] != nullptr);
     // As we want the probability of the lone query in this priority level as
     // 1, we set the numerator same as denominator.
-    const std::size_t numerator =
-        current_probabilities_[priority_level]->getDenominator();
-    current_probabilities_[priority_level]->addOrUpdateObject(query_id,
-                                                              numerator);
+    // TODO(harshad) - Get the mean work order times here too and use that as
+    // the numerator.
+    ExecutionStats *stats = getExecutionStats(query_id);
+    auto query_stats = stats->getCurrentStats();
+    /*const std::size_t numerator =
+        current_probabilities_[priority_level]->getDenominator();*/
+    if (query_stats.second != 0) {
+      const float mean_workorder_time =
+          query_stats.first / static_cast<float>(query_stats.second);
+      if (mean_workorder_time != 0) {
+        current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+            query_id, 1 / mean_workorder_time, 1 / mean_workorder_time);
+      }
+    }
     return;
   }
   // Else, there are more than one queries for the given priority level.
@@ -92,18 +101,25 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     // queries.
     DCHECK(mean_workorders_per_query.find(query_id) !=
            mean_workorders_per_query.end());
+    DCHECK_NE(mean_workorders_per_query[query_id], 0);
     current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
-        query_id, mean_workorders_per_query[query_id], denominator);
+        query_id,
+        1 / static_cast<float>(mean_workorders_per_query[query_id]),
+        denominator);
+    // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
   } else {
     // At least one of the queries has predicted time for next work order as 0.
     // In such a case, we don't update the probabilities and continue to use
     // the older probabilities.
+    // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
+    return;
   }
 }
 
 void Learner::updateProbabilitiesOfAllPriorityLevels() {
-  if (!hasFeedbackFromAllPriorityLevels() ||
-      has_feedback_from_all_queries_.empty()) {
+  if (!hasFeedbackFromAllPriorityLevels()) {
+      // has_feedback_from_all_queries_.empty()) {
+      // NOTE(harshad) : Not using this cache as it gets confusing.
     // Either we don't have enough feedback messages from all the priority
     // levels OR there are no active queries in the system.
     return;
@@ -111,9 +127,11 @@ void Learner::updateProbabilitiesOfAllPriorityLevels() {
   // Compute the predicted work order execution times for all the level.
   std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
   std::size_t sum_active_priorities = 0;
-  for (auto priority_iter : has_feedback_from_all_queries_) {
+  for (auto priority_iter = execution_stats_.begin();
+       priority_iter != execution_stats_.end();
+       ++priority_iter) {
     std::size_t total_time_curr_level = 0;
-    const std::size_t curr_priority_level = priority_iter.first;
+    const std::size_t curr_priority_level = priority_iter->first;
     sum_active_priorities += curr_priority_level;
     // For each query, find its predicted work order execution time.
     const std::unordered_map<std::size_t, std::size_t>
@@ -194,6 +212,7 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
   for (auto priority_iter = execution_stats_.cbegin();
        priority_iter != execution_stats_.cend();
        ++priority_iter) {
+    DCHECK(!priority_iter->second.empty());
     const std::size_t curr_priority_level = priority_iter->first;
     sum_priority_levels += curr_priority_level;
     priority_levels.emplace_back(curr_priority_level);
@@ -217,7 +236,8 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
           new ExecutionStats(FLAGS_max_past_entries_learner)));
   // As we are initializing the query, we obviously haven't gotten any
   // feedback message for this query. Hence mark the following field as false.
-  has_feedback_from_all_queries_[priority_level] = false;
+  // has_feedback_from_all_queries_[priority_level] = false;
+  // NOTE(harshad) : Not using this cache as it gets confusing.
 }
 
 void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
@@ -226,7 +246,9 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
     execution_stats_.erase(priority_level);
     current_probabilities_.erase(priority_level);
     probabilities_of_priority_levels_->removeObject(priority_level);
-    has_feedback_from_all_queries_.erase(priority_level);
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    // has_feedback_from_all_queries_.erase(priority_level);
+    // LOG(INFO) << "Removed priority level: " << priority_level;
     if (hasActiveQueries()) {
       if (static_cast<int>(priority_level) == highest_priority_level_) {
         // The priority level to be removed is the highest priority level.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/daab9ece/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index f99b1c6..2c6fdef 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -27,11 +27,14 @@
 #include "query_execution/ExecutionStats.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
 
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
 namespace quickstep {
 
 /** \addtogroup QueryExecution
@@ -49,16 +52,26 @@ class Learner {
       const serialization::NormalWorkOrderCompletionMessage
           &workorder_completion_proto);
 
+  /**
+   * @brief Add a query to the Learner.
+   *
+   * @param query_handle The query handle for the new query.
+   **/
   void addQuery(const QueryHandle &query_handle) {
     initializePriorityLevelIfNotPresent(query_handle.query_priority());
     initializeQuery(query_handle);
     relearn();
   }
 
+  /**
+   * @brief Remove a query from the Learner.
+   *
+   * @param query_id The ID of the query to be removed.
+   **/
   void removeQuery(const std::size_t query_id) {
-    // Find the iterator to the query in execution_stats_.
     DCHECK(isQueryPresent(query_id));
     const std::size_t priority_level = getQueryPriority(query_id);
+    // Find the iterator to the query in execution_stats_.
     auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
     execution_stats_[priority_level].erase(stats_iter_mutable);
     DCHECK(current_probabilities_.find(priority_level) !=
@@ -69,17 +82,25 @@ class Learner {
       // current_probabilities_[priority_level] ProbabilityStore.
       current_probabilities_[priority_level]->removeObject(query_id);
     }
+    // has_feedback_from_all_queries_[priority_level] = false;
     query_id_to_priority_lookup_.erase(query_id);
     checkAndRemovePriorityLevel(priority_level);
     relearn();
   }
 
-  void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+  /**
+   * @brief Remove the stats of a given operator in a given query.
+   **/
+  void removeOperator(const std::size_t query_id,
+                      const std::size_t operator_id) {
     ExecutionStats *stats = getExecutionStats(query_id);
     DCHECK(stats != nullptr);
     stats->removeOperator(operator_id);
   }
 
+  /**
+   * @brief Reset the probabilities and start learning again.
+   **/
   void relearn() {
     if (hasActiveQueries()) {
       initializeDefaultProbabilitiesForAllQueries();
@@ -87,10 +108,17 @@ class Learner {
     }
   }
 
+  /**
+   * @brief Check if there are any active queries in the Learner.
+   **/
   inline const bool hasActiveQueries() const {
     return !query_id_to_priority_lookup_.empty();
   }
 
+  /**
+   * @brief Get the number of active queries in the Learner for the given
+   *        priority level.
+   **/
   inline const std::size_t getNumActiveQueriesInPriorityLevel(
       const std::size_t priority_level) const {
     const auto it = execution_stats_.find(priority_level);
@@ -101,6 +129,9 @@ class Learner {
     }
   }
 
+  /**
+   * @brief Get the total number of active queries in the Learner.
+   **/
   inline const std::size_t getTotalNumActiveQueries() const {
     return query_id_to_priority_lookup_.size();
   }
@@ -115,6 +146,83 @@ class Learner {
     return highest_priority_level_;
   }
 
+  /**
+   * @brief Randomly pick a priority level.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A priority level. If no queries are present in the learner, return
+   *         kInvalidPriorityLevel.
+   **/
+  inline const int pickRandomPriorityLevel() const {
+    if (hasActiveQueries()) {
+      const int result = static_cast<int>(
+          probabilities_of_priority_levels_->pickRandomProperty());
+      /*LOG(INFO) << "Random priority level: " << result << " has "
+                << current_probabilities_.find(result)->second->getNumObjects()
+                << " queries";*/
+      return result;
+    } else {
+      return kInvalidPriorityLevel;
+    }
+  }
+
+  /**
+   * @brief Randomly pick a query from any priority level in the learner.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A query ID. If no queries are present in the learner, return
+   *         kInvalidQueryID.
+   **/
+  inline const int pickRandomQuery() const {
+    if (hasActiveQueries()) {
+      const int random_priority_level = pickRandomPriorityLevel();
+      // Note : The valid priority level values are non-zero.
+      DCHECK_GT(random_priority_level, 0);
+      const int result = pickRandomQueryFromPriorityLevel(
+          static_cast<std::size_t>(random_priority_level));
+      // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
+      return result;
+    } else {
+      // LOG(INFO) << "No active query right now";
+      return kInvalidQueryID;
+    }
+  }
+
+  /**
+   * @brief Randomly pick a query from a given priority level in the learner.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A query ID. If no queries are present for this priority level in
+   *         the learner, return kInvalidQueryID.
+   **/
+  inline const int pickRandomQueryFromPriorityLevel(
+      const std::size_t priority_level) const {
+    DCHECK(isPriorityLevelPresent(priority_level));
+    if (hasActiveQueries()) {
+      if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+        DCHECK(current_probabilities_.at(priority_level) != nullptr);
+        const auto it = current_probabilities_.find(priority_level);
+        if (it->second->getNumObjects() > 0) {
+          return static_cast<int>(
+              current_probabilities_.at(priority_level)->pickRandomProperty());
+        }
+        // LOG(INFO) << "No queries in priority level: " << priority_level;
+      } else {
+        DCHECK(default_probabilities_.at(priority_level) != nullptr);
+        const auto it = default_probabilities_.find(priority_level);
+        if (it->second->getNumObjects() > 0) {
+          return static_cast<int>(
+              default_probabilities_.at(priority_level)->pickRandomProperty());
+        }
+        // LOG(INFO) << "No queries in priority level: " << priority_level;
+      }
+    }
+    return kInvalidQueryID;
+  }
+
  private:
   /**
    * @brief Update the probabilities for queries in the given priority level.
@@ -261,6 +369,8 @@ class Learner {
    **/
   inline bool hasFeedbackFromAllQueriesInPriorityLevel(
       const std::size_t priority_level) const {
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    // return has_feedback_from_all_queries_.at(priority_level);
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
         &stats_vector = execution_stats_.at(priority_level);
     for (std::size_t i = 0; i < stats_vector.size(); ++i) {
@@ -275,16 +385,19 @@ class Learner {
   inline void updateFeedbackFromQueriesInPriorityLevel(
       const std::size_t priority_level) {
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
-        &stats_vector = execution_stats_.at(priority_level);
+        &stats_vector = execution_stats_[priority_level];
     for (std::size_t i = 0; i < stats_vector.size(); ++i) {
       DCHECK(stats_vector[i].second != nullptr);
       if (!stats_vector[i].second->hasStats()) {
         // At least one query has no statistics so far.
+        // NOTE(harshad) : Not using this cache as it gets confusing.
+        // has_feedback_from_all_queries_[priority_level] = false;
         return;
       }
     }
     // All the queries have at least one execution statistic.
-    has_feedback_from_all_queries_[priority_level] = true;
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    // has_feedback_from_all_queries_[priority_level] = true;
   }
 
   /**
@@ -313,31 +426,36 @@ class Learner {
   }
 
   /**
-   * @param mean_workorder_per_query A vector of pairs in which:
-   *        1st element is mean time per work order
-   *        2nd element is the query ID.
+   * @param mean_workorder_per_query An unordered_map in which:
+   *        1st element is the query ID.
+   *        2nd element is mean time per work order
    *
    * @note If any query has mean work order time as 0, we return 0 as the
    *       denominator.
    *
    * @return The denominator to be used for probability calculations.
    **/
-  inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
-                                        &mean_workorder_per_query) const {
+  inline float calculateDenominator(
+      const std::unordered_map<std::size_t, std::size_t>
+          &mean_workorder_per_query) const {
     float denominator = 0;
     for (const auto &element : mean_workorder_per_query) {
       if (element.second != 0) {
         denominator += 1/static_cast<float>(element.second);
-      } else {
-        return 0;
+      /*} else {
+        return 0;*/
       }
     }
     return denominator;
   }
 
   inline bool hasFeedbackFromAllPriorityLevels() const {
-    for (auto feedback : has_feedback_from_all_queries_) {
-      if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+    // for (auto feedback : has_feedback_from_all_queries_) {
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    for (auto priority_iter = default_probabilities_.cbegin();
+         priority_iter != default_probabilities_.cend();
+         ++priority_iter) {
+      if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_iter->first)) {
         return false;
       }
     }
@@ -369,9 +487,10 @@ class Learner {
   // ProbabilityStrore for probabilities mapped to the priority levels.
   std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
 
+  // NOTE(harshad) : Not using this cache as it gets confusing.
   // Key = priority level. Value = A boolean that indicates if we have received
   // feedback from all the queries in the given priority level.
-  std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+  // std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
 
   int highest_priority_level_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/daab9ece/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 233dd2e..ed52f75 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -41,7 +41,7 @@ class ProbabilityStore {
    * @brief Constructor.
    **/
   ProbabilityStore()
-      : common_denominator_(1.0), mt_(std::random_device()()) {}
+      : common_denominator_(1.0) {}
 
   /**
    * @brief Get the number of objects in the store.
@@ -221,11 +221,16 @@ class ProbabilityStore {
   /**
    * @brief Return a randomly chosen property.
    *
+   * TODO(harshad) - If it is expensive to create the random device
+   * on every invocation of this function, make it a class variable.
+   * In which case, we won't be able to mark the function as const.
+   *
    * @note The random number is uniformly chosen.
    **/
-  inline const std::size_t pickRandomProperty() {
+  inline const std::size_t pickRandomProperty() const {
     std::uniform_real_distribution<float> dist(0.0, 1.0);
-    const float chosen_probability = dist(mt_);
+    std::random_device rd;
+    const float chosen_probability = dist(rd);
     return getPropertyForProbability(chosen_probability);
   }
 
@@ -260,7 +265,7 @@ class ProbabilityStore {
    *         or equal to the input cumulative probability.
    **/
   inline const std::size_t getPropertyForProbability(
-      const float key_cumulative_probability) {
+      const float key_cumulative_probability) const {
     DCHECK(!cumulative_probabilities_.empty());
     // It doesn't matter in which order the objects are arranged in the
     // cumulative_probabilities_ vector.
@@ -296,8 +301,6 @@ class ProbabilityStore {
 
   float common_denominator_;
 
-  std::mt19937_64 mt_;
-
   DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/daab9ece/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index e13f3e0..feaa285 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -44,6 +44,7 @@ typedef tmb::client_id client_id;
 typedef tmb::message_type_id message_type_id;
 
 const int kInvalidPriorityLevel = -1;
+const int kInvalidQueryID = -1;
 
 using ClientIDMap = ThreadIDBasedMap<client_id,
                                      'C',

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/daab9ece/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 107576f..7d67b1b 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -220,15 +220,16 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
 
   // Randomize the orders.
   std::random_device rd;
-  std::mt19937 g(rd());
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
 
   std::shuffle(priorities_insertion_order.begin(),
                priorities_insertion_order.end(),
-               g);
+               g1);
 
   std::shuffle(priorities_removal_order.begin(),
                priorities_removal_order.end(),
-               g);
+               g2);
 
   Learner learner;
   EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
@@ -263,4 +264,256 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
   EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
 }
 
+TEST_F(LearnerTest, PickRandomPriorityLevelTest) {
+  std::vector<std::size_t> priorities_insertion_order;
+  std::vector<std::size_t> priorities_removal_order;
+  const std::size_t kNumPrioritiesToTest = 20;
+  for (std::size_t priority_num = 1;
+       priority_num <= kNumPrioritiesToTest;
+       ++priority_num) {
+    // Note: Priority level should be non-zero, hence we begin from 1.
+    priorities_insertion_order.emplace_back(priority_num);
+    priorities_removal_order.emplace_back(priority_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
+
+  std::shuffle(priorities_insertion_order.begin(),
+               priorities_insertion_order.end(),
+               g1);
+
+  std::shuffle(priorities_removal_order.begin(),
+               priorities_removal_order.end(),
+               g2);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+
+  std::unique_ptr<QueryHandle> handle;
+  // First insert the queries in the order of priorities as defined by
+  // priorities_insertion_order.
+  for (auto it = priorities_insertion_order.begin();
+       it != priorities_insertion_order.end();
+       ++it) {
+    // Note that the query ID is kept the same as priority level for simplicity.
+    handle.reset(new QueryHandle(*it, *it));
+    learner.addQuery(*handle);
+    const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+    // Try to find the randomly picked priority level in the
+    // priorities_insertion_order vector.
+    auto find_priority_level_it = std::find(
+        priorities_insertion_order.begin(), it + 1, picked_priority_level);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+  }
+
+  // Repeat the tests a few more times.
+  const std::size_t kNumTests = 200;
+  for (std::size_t test_num = 0; test_num < kNumTests; ++test_num) {
+    const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+    // Try to find the randomly picked priority level in the
+    // priorities_insertion_order vector.
+    auto find_priority_level_it = std::find(priorities_insertion_order.begin(),
+                                            priorities_insertion_order.end(),
+                                            picked_priority_level);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+  }
+
+  // Now remove the queries in the order of priorities as defined by
+  // priorities_removal_order.
+  for (auto it = priorities_removal_order.begin();
+       it != priorities_removal_order.end();
+       ++it) {
+    // Recall that the query ID is the same as priority level.
+    const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+    // Try to find the randomly picked priority level in the
+    // priorities_removal_order vector.
+    auto find_priority_level_it = std::find(
+        it, priorities_removal_order.end(), picked_priority_level);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_priority_level_it != priorities_removal_order.end());
+    learner.removeQuery(*it);
+  }
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+}
+
+TEST_F(LearnerTest, PickRandomQueryDefaultProbabilitiesTest) {
+  // We use a set of unique query IDs. For each query ID, we assign a priority
+  // level. The set of priority levels is smaller than the set of query IDs, so
+  // that we can have more than one queries for a given priority level.
+
+  // Also, in this test we don't send any completion feedback message to the
+  // learner. Therefore it always refers to the default probabilities set for
+  // the queries.
+  std::vector<std::size_t> query_ids_insertion_order;
+  std::vector<std::size_t> query_ids_removal_order;
+  const std::size_t kNumQueriesToTest = 20;
+  for (std::size_t query_num = 0;
+       query_num < kNumQueriesToTest;
+       ++query_num) {
+    query_ids_insertion_order.emplace_back(query_num);
+    query_ids_removal_order.emplace_back(query_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
+
+  std::shuffle(query_ids_insertion_order.begin(),
+               query_ids_insertion_order.end(),
+               g1);
+
+  std::shuffle(query_ids_removal_order.begin(),
+               query_ids_removal_order.end(),
+               g2);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+  std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+  std::size_t priority_level_index = 0;
+  std::unique_ptr<QueryHandle> handle;
+  // Insert the queries in the order as defined in query_ids_insertion_order.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+    priority_level_index = (priority_level_index + 1) % priority_levels.size();
+    learner.addQuery(*handle);
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(
+        query_ids_insertion_order.begin(), it + 1, picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Repeat the tests a few more times.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(query_ids_insertion_order.begin(),
+                                   query_ids_insertion_order.end(),
+                                   picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Remove the queries in the order as defined in query_ids_removal_order.
+  for (auto it = query_ids_removal_order.begin();
+       it != query_ids_removal_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_removal_order.
+    auto find_query_it = std::find(
+        it, query_ids_removal_order.end(), picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+    learner.removeQuery(*it);
+  }
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
+
+TEST_F(LearnerTest, PickRandomQueryCurrentProbabilitiesTest) {
+  // We use a set of unique query IDs. For each query ID, we assign a priority
+  // level. The set of priority levels is smaller than the set of query IDs, so
+  // that we can have more than one queries for a given priority level.
+
+  // In this test we send completion feedback messages for all the queries
+  // to the learner. Therefore it refers to the current probabilities set for
+  // the queries.
+  std::vector<std::size_t> query_ids_insertion_order;
+  std::vector<std::size_t> query_ids_removal_order;
+  const std::size_t kNumQueriesToTest = 20;
+  for (std::size_t query_num = 0;
+       query_num < kNumQueriesToTest;
+       ++query_num) {
+    query_ids_insertion_order.emplace_back(query_num);
+    query_ids_removal_order.emplace_back(query_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
+
+  std::shuffle(query_ids_insertion_order.begin(),
+               query_ids_insertion_order.end(),
+               g1);
+
+  std::shuffle(query_ids_removal_order.begin(),
+               query_ids_removal_order.end(),
+               g2);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+  std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+  std::size_t priority_level_index = 0;
+  std::unique_ptr<QueryHandle> handle;
+  // Insert the queries in the order as defined in query_ids_insertion_order.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+    priority_level_index = (priority_level_index + 1) % priority_levels.size();
+    learner.addQuery(*handle);
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(
+        query_ids_insertion_order.begin(), it + 1, picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Now send one completion feedback message per query to the learner.
+  const std::size_t kOperatorID = 0;
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    // LOG(INFO) << "Completion message for query : " << *it;
+    learner.addCompletionFeedback(createMockCompletionMessage(*it, kOperatorID));
+  }
+
+  // Repeat the tests a few more times.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(query_ids_insertion_order.begin(),
+                                   query_ids_insertion_order.end(),
+                                   picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Remove the queries in the order as defined in query_ids_removal_order.
+  for (auto it = query_ids_removal_order.begin();
+       it != query_ids_removal_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_removal_order.
+    auto find_query_it = std::find(
+        it, query_ids_removal_order.end(), picked_query_id);
+    // We expect the search to be successful.
+    // LOG(INFO) << "Picked query ID: " << picked_query_id << "\n";
+    EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+    learner.removeQuery(*it);
+    // LOG(INFO) << "Removed query ID: " << *it;
+  }
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
 }  // namespace quickstep