You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/30 20:49:50 UTC
[12/18] incubator-quickstep git commit: Fixed getDenominator method.
More unit tests.
Fixed getDenominator method. More unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/63c12a1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/63c12a1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/63c12a1a
Branch: refs/heads/scheduler++
Commit: 63c12a1a62f7cd760776dc42819dcc2bdadd73f6
Parents: b65cc2f
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sat Jun 25 09:35:10 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/ExecutionStats.hpp | 15 +-
query_execution/Learner.cpp | 46 +++--
query_execution/Learner.hpp | 147 ++++++++++++--
query_execution/ProbabilityStore.hpp | 15 +-
query_execution/QueryExecutionTypedefs.hpp | 1 +
query_execution/tests/Learner_unittest.cpp | 259 +++++++++++++++++++++++-
6 files changed, 440 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index 769c7a4..5643749 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,17 +58,17 @@ class ExecutionStats {
}
/**
- * @brief Check if there are any stats present.
+ * @brief Check if there are stats present for at least one active operator.
**/
inline bool hasStats() const {
for (auto it = active_operators_.begin();
it != active_operators_.end();
++it) {
- if (!it->second->hasStatsForOperator()) {
- return false;
+ if (it->second->hasStatsForOperator()) {
+ return true;
}
}
- return true;
+ return false;
}
/**
@@ -109,14 +109,13 @@ class ExecutionStats {
* @param operator_index The operator index which the value belongs to.
**/
void addEntry(std::size_t value, std::size_t operator_index) {
- if (hasOperator(operator_index)) {
- // This is not the first entry for the given operator.
- active_operators_[operator_index]->addEntry(value);
- } else {
+ if (!hasOperator(operator_index)) {
+ // This is the first entry for the given operator.
// Create the OperatorStats object for this operator.
active_operators_[operator_index] =
std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
}
+ active_operators_[operator_index]->addEntry(value);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 720df33..11c3735 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,7 +26,6 @@
#include "query_execution/ExecutionStats.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
@@ -76,10 +75,20 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
DCHECK(current_probabilities_[priority_level] != nullptr);
// As we want the probability of the lone query in this priority level as
// 1, we set the numerator same as denominator.
- const std::size_t numerator =
- current_probabilities_[priority_level]->getDenominator();
- current_probabilities_[priority_level]->addOrUpdateObject(query_id,
- numerator);
+ // TODO(harshad) - Get the mean work order times here too and use that as
+ // the numerator.
+ ExecutionStats *stats = getExecutionStats(query_id);
+ auto query_stats = stats->getCurrentStats();
+ /*const std::size_t numerator =
+ current_probabilities_[priority_level]->getDenominator();*/
+ if (query_stats.second != 0) {
+ const float mean_workorder_time =
+ query_stats.first / static_cast<float>(query_stats.second);
+ if (mean_workorder_time != 0) {
+ current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+ query_id, 1 / mean_workorder_time, 1 / mean_workorder_time);
+ }
+ }
return;
}
// Else, there are more than one queries for the given priority level.
@@ -92,18 +101,25 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
// queries.
DCHECK(mean_workorders_per_query.find(query_id) !=
mean_workorders_per_query.end());
+ DCHECK_NE(mean_workorders_per_query[query_id], 0);
current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
- query_id, mean_workorders_per_query[query_id], denominator);
+ query_id,
+ 1 / static_cast<float>(mean_workorders_per_query[query_id]),
+ denominator);
+ // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
} else {
// At least one of the queries has predicted time for next work order as 0.
// In such a case, we don't update the probabilities and continue to use
// the older probabilities.
+ // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
+ return;
}
}
void Learner::updateProbabilitiesOfAllPriorityLevels() {
- if (!hasFeedbackFromAllPriorityLevels() ||
- has_feedback_from_all_queries_.empty()) {
+ if (!hasFeedbackFromAllPriorityLevels()) {
+ // has_feedback_from_all_queries_.empty()) {
+ // NOTE(harshad) : Not using this cache as it gets confusing.
// Either we don't have enough feedback messages from all the priority
// levels OR there are no active queries in the system.
return;
@@ -111,9 +127,11 @@ void Learner::updateProbabilitiesOfAllPriorityLevels() {
// Compute the predicted work order execution times for all the level.
std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
std::size_t sum_active_priorities = 0;
- for (auto priority_iter : has_feedback_from_all_queries_) {
+ for (auto priority_iter = execution_stats_.begin();
+ priority_iter != execution_stats_.end();
+ ++priority_iter) {
std::size_t total_time_curr_level = 0;
- const std::size_t curr_priority_level = priority_iter.first;
+ const std::size_t curr_priority_level = priority_iter->first;
sum_active_priorities += curr_priority_level;
// For each query, find its predicted work order execution time.
const std::unordered_map<std::size_t, std::size_t>
@@ -194,6 +212,7 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
for (auto priority_iter = execution_stats_.cbegin();
priority_iter != execution_stats_.cend();
++priority_iter) {
+ DCHECK(!priority_iter->second.empty());
const std::size_t curr_priority_level = priority_iter->first;
sum_priority_levels += curr_priority_level;
priority_levels.emplace_back(curr_priority_level);
@@ -217,7 +236,8 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
new ExecutionStats(FLAGS_max_past_entries_learner)));
// As we are initializing the query, we obviously haven't gotten any
// feedback message for this query. Hence mark the following field as false.
- has_feedback_from_all_queries_[priority_level] = false;
+ // has_feedback_from_all_queries_[priority_level] = false;
+ // NOTE(harshad) : Not using this cache as it gets confusing.
}
void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
@@ -226,7 +246,9 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
execution_stats_.erase(priority_level);
current_probabilities_.erase(priority_level);
probabilities_of_priority_levels_->removeObject(priority_level);
- has_feedback_from_all_queries_.erase(priority_level);
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // has_feedback_from_all_queries_.erase(priority_level);
+ // LOG(INFO) << "Removed priority level: " << priority_level;
if (hasActiveQueries()) {
if (static_cast<int>(priority_level) == highest_priority_level_) {
// The priority level to be removed is the highest priority level.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index f99b1c6..2c6fdef 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -27,11 +27,14 @@
#include "query_execution/ExecutionStats.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
namespace quickstep {
/** \addtogroup QueryExecution
@@ -49,16 +52,26 @@ class Learner {
const serialization::NormalWorkOrderCompletionMessage
&workorder_completion_proto);
+ /**
+ * @brief Add a query to the Learner.
+ *
+ * @param query_handle The query handle for the new query.
+ **/
void addQuery(const QueryHandle &query_handle) {
initializePriorityLevelIfNotPresent(query_handle.query_priority());
initializeQuery(query_handle);
relearn();
}
+ /**
+ * @brief Remove a query from the Learner.
+ *
+ * @param query_id The ID of the query to be removed.
+ **/
void removeQuery(const std::size_t query_id) {
- // Find the iterator to the query in execution_stats_.
DCHECK(isQueryPresent(query_id));
const std::size_t priority_level = getQueryPriority(query_id);
+ // Find the iterator to the query in execution_stats_.
auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
execution_stats_[priority_level].erase(stats_iter_mutable);
DCHECK(current_probabilities_.find(priority_level) !=
@@ -69,17 +82,25 @@ class Learner {
// current_probabilities_[priority_level] ProbabilityStore.
current_probabilities_[priority_level]->removeObject(query_id);
}
+ // has_feedback_from_all_queries_[priority_level] = false;
query_id_to_priority_lookup_.erase(query_id);
checkAndRemovePriorityLevel(priority_level);
relearn();
}
- void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+ /**
+ * @brief Remove the stats of a given operator in a given query.
+ **/
+ void removeOperator(const std::size_t query_id,
+ const std::size_t operator_id) {
ExecutionStats *stats = getExecutionStats(query_id);
DCHECK(stats != nullptr);
stats->removeOperator(operator_id);
}
+ /**
+ * @brief Reset the probabilities and start learning again.
+ **/
void relearn() {
if (hasActiveQueries()) {
initializeDefaultProbabilitiesForAllQueries();
@@ -87,10 +108,17 @@ class Learner {
}
}
+ /**
+ * @brief Check if there are any active queries in the Learner.
+ **/
inline const bool hasActiveQueries() const {
return !query_id_to_priority_lookup_.empty();
}
+ /**
+ * @brief Get the number of active queries in the Learner for the given
+ * priority level.
+ **/
inline const std::size_t getNumActiveQueriesInPriorityLevel(
const std::size_t priority_level) const {
const auto it = execution_stats_.find(priority_level);
@@ -101,6 +129,9 @@ class Learner {
}
}
+ /**
+ * @brief Get the total number of active queries in the Learner.
+ **/
inline const std::size_t getTotalNumActiveQueries() const {
return query_id_to_priority_lookup_.size();
}
@@ -115,6 +146,83 @@ class Learner {
return highest_priority_level_;
}
+ /**
+ * @brief Randomly pick a priority level.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A priority level. If no queries are present in the learner, return
+ * kInvalidPriorityLevel.
+ **/
+ inline const int pickRandomPriorityLevel() const {
+ if (hasActiveQueries()) {
+ const int result = static_cast<int>(
+ probabilities_of_priority_levels_->pickRandomProperty());
+ /*LOG(INFO) << "Random priority level: " << result << " has "
+ << current_probabilities_.find(result)->second->getNumObjects()
+ << " queries";*/
+ return result;
+ } else {
+ return kInvalidPriorityLevel;
+ }
+ }
+
+ /**
+ * @brief Randomly pick a query from any priority level in the learner.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A query ID. If no queries are present in the learner, return
+ * kInvalidQueryID.
+ **/
+ inline const int pickRandomQuery() const {
+ if (hasActiveQueries()) {
+ const int random_priority_level = pickRandomPriorityLevel();
+ // Note : The valid priority level values are non-zero.
+ DCHECK_GT(random_priority_level, 0);
+ const int result = pickRandomQueryFromPriorityLevel(
+ static_cast<std::size_t>(random_priority_level));
+ // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
+ return result;
+ } else {
+ // LOG(INFO) << "No active query right now";
+ return kInvalidQueryID;
+ }
+ }
+
+ /**
+ * @brief Randomly pick a query from a given priority level in the learner.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A query ID. If no queries are present for this priority level in
+ * the learner, return kInvalidQueryID.
+ **/
+ inline const int pickRandomQueryFromPriorityLevel(
+ const std::size_t priority_level) const {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (hasActiveQueries()) {
+ if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+ DCHECK(current_probabilities_.at(priority_level) != nullptr);
+ const auto it = current_probabilities_.find(priority_level);
+ if (it->second->getNumObjects() > 0) {
+ return static_cast<int>(
+ current_probabilities_.at(priority_level)->pickRandomProperty());
+ }
+ // LOG(INFO) << "No queries in priority level: " << priority_level;
+ } else {
+ DCHECK(default_probabilities_.at(priority_level) != nullptr);
+ const auto it = default_probabilities_.find(priority_level);
+ if (it->second->getNumObjects() > 0) {
+ return static_cast<int>(
+ default_probabilities_.at(priority_level)->pickRandomProperty());
+ }
+ // LOG(INFO) << "No queries in priority level: " << priority_level;
+ }
+ }
+ return kInvalidQueryID;
+ }
+
private:
/**
* @brief Update the probabilities for queries in the given priority level.
@@ -261,6 +369,8 @@ class Learner {
**/
inline bool hasFeedbackFromAllQueriesInPriorityLevel(
const std::size_t priority_level) const {
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // return has_feedback_from_all_queries_.at(priority_level);
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
&stats_vector = execution_stats_.at(priority_level);
for (std::size_t i = 0; i < stats_vector.size(); ++i) {
@@ -275,16 +385,19 @@ class Learner {
inline void updateFeedbackFromQueriesInPriorityLevel(
const std::size_t priority_level) {
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
- &stats_vector = execution_stats_.at(priority_level);
+ &stats_vector = execution_stats_[priority_level];
for (std::size_t i = 0; i < stats_vector.size(); ++i) {
DCHECK(stats_vector[i].second != nullptr);
if (!stats_vector[i].second->hasStats()) {
// At least one query has no statistics so far.
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // has_feedback_from_all_queries_[priority_level] = false;
return;
}
}
// All the queries have at least one execution statistic.
- has_feedback_from_all_queries_[priority_level] = true;
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // has_feedback_from_all_queries_[priority_level] = true;
}
/**
@@ -313,31 +426,36 @@ class Learner {
}
/**
- * @param mean_workorder_per_query A vector of pairs in which:
- * 1st element is mean time per work order
- * 2nd element is the query ID.
+ * @param mean_workorder_per_query An unordered_map in which:
+ * 1st element is the query ID.
+ * 2nd element is mean time per work order
*
* @note If any query has mean work order time as 0, we return 0 as the
* denominator.
*
* @return The denominator to be used for probability calculations.
**/
- inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
- &mean_workorder_per_query) const {
+ inline float calculateDenominator(
+ const std::unordered_map<std::size_t, std::size_t>
+ &mean_workorder_per_query) const {
float denominator = 0;
for (const auto &element : mean_workorder_per_query) {
if (element.second != 0) {
denominator += 1/static_cast<float>(element.second);
- } else {
- return 0;
+ /*} else {
+ return 0;*/
}
}
return denominator;
}
inline bool hasFeedbackFromAllPriorityLevels() const {
- for (auto feedback : has_feedback_from_all_queries_) {
- if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+ // for (auto feedback : has_feedback_from_all_queries_) {
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ for (auto priority_iter = default_probabilities_.cbegin();
+ priority_iter != default_probabilities_.cend();
+ ++priority_iter) {
+ if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_iter->first)) {
return false;
}
}
@@ -369,9 +487,10 @@ class Learner {
// ProbabilityStrore for probabilities mapped to the priority levels.
std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
+ // NOTE(harshad) : Not using this cache as it gets confusing.
// Key = priority level. Value = A boolean that indicates if we have received
// feedback from all the queries in the given priority level.
- std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+ // std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
int highest_priority_level_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 233dd2e..ed52f75 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -41,7 +41,7 @@ class ProbabilityStore {
* @brief Constructor.
**/
ProbabilityStore()
- : common_denominator_(1.0), mt_(std::random_device()()) {}
+ : common_denominator_(1.0) {}
/**
* @brief Get the number of objects in the store.
@@ -221,11 +221,16 @@ class ProbabilityStore {
/**
* @brief Return a randomly chosen property.
*
+ * TODO(harshad) - If it is expensive to create the random device
+ * on every invocation of this function, make it a class variable.
+ * In which case, we won't be able to mark the function as const.
+ *
* @note The random number is uniformly chosen.
**/
- inline const std::size_t pickRandomProperty() {
+ inline const std::size_t pickRandomProperty() const {
std::uniform_real_distribution<float> dist(0.0, 1.0);
- const float chosen_probability = dist(mt_);
+ std::random_device rd;
+ const float chosen_probability = dist(rd);
return getPropertyForProbability(chosen_probability);
}
@@ -260,7 +265,7 @@ class ProbabilityStore {
* or equal to the input cumulative probability.
**/
inline const std::size_t getPropertyForProbability(
- const float key_cumulative_probability) {
+ const float key_cumulative_probability) const {
DCHECK(!cumulative_probabilities_.empty());
// It doesn't matter in which order the objects are arranged in the
// cumulative_probabilities_ vector.
@@ -296,8 +301,6 @@ class ProbabilityStore {
float common_denominator_;
- std::mt19937_64 mt_;
-
DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index e13f3e0..feaa285 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -44,6 +44,7 @@ typedef tmb::client_id client_id;
typedef tmb::message_type_id message_type_id;
const int kInvalidPriorityLevel = -1;
+const int kInvalidQueryID = -1;
using ClientIDMap = ThreadIDBasedMap<client_id,
'C',
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 107576f..7d67b1b 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -220,15 +220,16 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
// Randomize the orders.
std::random_device rd;
- std::mt19937 g(rd());
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
std::shuffle(priorities_insertion_order.begin(),
priorities_insertion_order.end(),
- g);
+ g1);
std::shuffle(priorities_removal_order.begin(),
priorities_removal_order.end(),
- g);
+ g2);
Learner learner;
EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
@@ -263,4 +264,256 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
}
+TEST_F(LearnerTest, PickRandomPriorityLevelTest) {
+ std::vector<std::size_t> priorities_insertion_order;
+ std::vector<std::size_t> priorities_removal_order;
+ const std::size_t kNumPrioritiesToTest = 20;
+ for (std::size_t priority_num = 1;
+ priority_num <= kNumPrioritiesToTest;
+ ++priority_num) {
+ // Note: Priority level should be non-zero, hence we begin from 1.
+ priorities_insertion_order.emplace_back(priority_num);
+ priorities_removal_order.emplace_back(priority_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
+
+ std::shuffle(priorities_insertion_order.begin(),
+ priorities_insertion_order.end(),
+ g1);
+
+ std::shuffle(priorities_removal_order.begin(),
+ priorities_removal_order.end(),
+ g2);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+
+ std::unique_ptr<QueryHandle> handle;
+ // First insert the queries in the order of priorities as defined by
+ // priorities_insertion_order.
+ for (auto it = priorities_insertion_order.begin();
+ it != priorities_insertion_order.end();
+ ++it) {
+ // Note that the query ID is kept the same as priority level for simplicity.
+ handle.reset(new QueryHandle(*it, *it));
+ learner.addQuery(*handle);
+ const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+ // Try to find the randomly picked priority level in the
+ // priorities_insertion_order vector.
+ auto find_priority_level_it = std::find(
+ priorities_insertion_order.begin(), it + 1, picked_priority_level);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+ }
+
+ // Repeat the tests a few more times.
+ const std::size_t kNumTests = 200;
+ for (std::size_t test_num = 0; test_num < kNumTests; ++test_num) {
+ const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+ // Try to find the randomly picked priority level in the
+ // priorities_insertion_order vector.
+ auto find_priority_level_it = std::find(priorities_insertion_order.begin(),
+ priorities_insertion_order.end(),
+ picked_priority_level);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+ }
+
+ // Now remove the queries in the order of priorities as defined by
+ // priorities_removal_order.
+ for (auto it = priorities_removal_order.begin();
+ it != priorities_removal_order.end();
+ ++it) {
+ // Recall that the query ID is the same as priority level.
+ const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+ // Try to find the randomly picked priority level in the
+ // priorities_removal_order vector.
+ auto find_priority_level_it = std::find(
+ it, priorities_removal_order.end(), picked_priority_level);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_priority_level_it != priorities_removal_order.end());
+ learner.removeQuery(*it);
+ }
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+}
+
+TEST_F(LearnerTest, PickRandomQueryDefaultProbabilitiesTest) {
+ // We use a set of unique query IDs. For each query ID, we assign a priority
+ // level. The set of priority levels is smaller than the set of query IDs, so
+ // that we can have more than one queries for a given priority level.
+
+ // Also, in this test we don't send any completion feedback message to the
+ // learner. Therefore it always refers to the default probabilities set for
+ // the queries.
+ std::vector<std::size_t> query_ids_insertion_order;
+ std::vector<std::size_t> query_ids_removal_order;
+ const std::size_t kNumQueriesToTest = 20;
+ for (std::size_t query_num = 0;
+ query_num < kNumQueriesToTest;
+ ++query_num) {
+ query_ids_insertion_order.emplace_back(query_num);
+ query_ids_removal_order.emplace_back(query_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
+
+ std::shuffle(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ g1);
+
+ std::shuffle(query_ids_removal_order.begin(),
+ query_ids_removal_order.end(),
+ g2);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+ std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+ std::size_t priority_level_index = 0;
+ std::unique_ptr<QueryHandle> handle;
+ // Insert the queries in the order as defined in query_ids_insertion_order.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+ priority_level_index = (priority_level_index + 1) % priority_levels.size();
+ learner.addQuery(*handle);
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(
+ query_ids_insertion_order.begin(), it + 1, picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Repeat the tests a few more times.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Remove the queries in the order as defined in query_ids_removal_order.
+ for (auto it = query_ids_removal_order.begin();
+ it != query_ids_removal_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_removal_order.
+ auto find_query_it = std::find(
+ it, query_ids_removal_order.end(), picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+ learner.removeQuery(*it);
+ }
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
+
+TEST_F(LearnerTest, PickRandomQueryCurrentProbabilitiesTest) {
+ // We use a set of unique query IDs. For each query ID, we assign a priority
+ // level. The set of priority levels is smaller than the set of query IDs, so
+ // that we can have more than one queries for a given priority level.
+
+ // In this test we send completion feedback messages for all the queries
+ // to the learner. Therefore it refers to the current probabilities set for
+ // the queries.
+ std::vector<std::size_t> query_ids_insertion_order;
+ std::vector<std::size_t> query_ids_removal_order;
+ const std::size_t kNumQueriesToTest = 20;
+ for (std::size_t query_num = 0;
+ query_num < kNumQueriesToTest;
+ ++query_num) {
+ query_ids_insertion_order.emplace_back(query_num);
+ query_ids_removal_order.emplace_back(query_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
+
+ std::shuffle(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ g1);
+
+ std::shuffle(query_ids_removal_order.begin(),
+ query_ids_removal_order.end(),
+ g2);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+ std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+ std::size_t priority_level_index = 0;
+ std::unique_ptr<QueryHandle> handle;
+ // Insert the queries in the order as defined in query_ids_insertion_order.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+ priority_level_index = (priority_level_index + 1) % priority_levels.size();
+ learner.addQuery(*handle);
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(
+ query_ids_insertion_order.begin(), it + 1, picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Now send one completion feedback message per query to the learner.
+ const std::size_t kOperatorID = 0;
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ // LOG(INFO) << "Completion message for query : " << *it;
+ learner.addCompletionFeedback(createMockCompletionMessage(*it, kOperatorID));
+ }
+
+ // Repeat the tests a few more times.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Remove the queries in the order as defined in query_ids_removal_order.
+ for (auto it = query_ids_removal_order.begin();
+ it != query_ids_removal_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_removal_order.
+ auto find_query_it = std::find(
+ it, query_ids_removal_order.end(), picked_query_id);
+ // We expect the search to be successful.
+ // LOG(INFO) << "Picked query ID: " << picked_query_id << "\n";
+ EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+ learner.removeQuery(*it);
+ // LOG(INFO) << "Removed query ID: " << *it;
+ }
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
} // namespace quickstep