You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/01 04:41:38 UTC
[08/18] incubator-quickstep git commit: Created Learner class.
Created Learner class.
- Learner keeps track of statistics of concurrent queries
- It maintains the probabilities for individual queries as well as the
priority levels in the system.
- Changes in ProbabilityStore class including addition of numerator,
denominator and more unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3fbda4ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3fbda4ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3fbda4ac
Branch: refs/heads/scheduler++
Commit: 3fbda4ac1c5c93a6a011b2807cd855798e031047
Parents: 73917bf
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 15:54:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 23:41:05 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 11 +
query_execution/ExecutionStats.hpp | 18 +
query_execution/Learner.cpp | 195 ++++++++++
query_execution/Learner.hpp | 352 +++++++++++++++++++
query_execution/PolicyEnforcer.cpp | 2 +
query_execution/ProbabilityStore.hpp | 148 ++++++--
.../tests/ProbabilityStore_unittest.cpp | 45 ++-
7 files changed, 728 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fbda4ac/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 18ae0da..cb0f815 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,7 @@ add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitR
add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_Learner Learner.cpp Learner.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -93,11 +94,20 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
quickstep_threading_Thread
quickstep_utility_Macros
tmb)
+target_link_libraries(quickstep_queryexecution_Learner
+ ${GFLAGS_LIB_NAME}
+ glog
+ quickstep_queryexecution_ExecutionStats
+ quickstep_queryexecution_ProbabilityStore
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
${GFLAGS_LIB_NAME}
glog
quickstep_queryexecution_ExecutionStats
quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_Learner
quickstep_queryexecution_ProbabilityStore
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
@@ -212,6 +222,7 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_ExecutionStats
quickstep_queryexecution_Foreman
quickstep_queryexecution_ForemanLite
+ quickstep_queryexecution_Learner
quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fbda4ac/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index f28f367..769c7a4 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,6 +58,20 @@ class ExecutionStats {
}
/**
+ * @brief Check if there are any stats present.
+ **/
+ inline bool hasStats() const {
+ for (auto it = active_operators_.begin();
+ it != active_operators_.end();
+ ++it) {
+ if (!it->second->hasStatsForOperator()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* @brief Get the current stats.
*
* @note This function updates the cache, hence it can't be const. We are lazy
@@ -145,6 +159,10 @@ class ExecutionStats {
DCHECK_LE(times_.size(), max_entries_);
}
+ inline bool hasStatsForOperator() const {
+ return !times_.empty();
+ }
+
private:
const std::size_t max_entries_;
std::deque<std::uint64_t> times_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fbda4ac/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
new file mode 100644
index 0000000..72c68f0
--- /dev/null
+++ b/query_execution/Learner.cpp
@@ -0,0 +1,195 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/Learner.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void Learner::addCompletionFeedback(
+ const serialization::NormalWorkOrderCompletionMessage
+ &workorder_completion_proto) {
+ const std::size_t query_id = workorder_completion_proto.query_id();
+ DCHECK(isQueryPresent(query_id));
+ const std::size_t priority_level = getQueryPriority(query_id);
+ ExecutionStats *execution_stats = getExecutionStats(query_id);
+ DCHECK(execution_stats != nullptr);
+ execution_stats->addEntry(
+ workorder_completion_proto.execution_time_in_microseconds(),
+ workorder_completion_proto.operator_index());
+
+ // updateProbability();
+ if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+ updateFeedbackFromQueriesInPriorityLevel(priority_level);
+ }
+}
+
+void Learner::updateProbabilitiesForQueriesInPriorityLevel(
+ const std::size_t priority_level, const std::size_t query_id) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (execution_stats_[priority_level].empty()) {
+ LOG(INFO) << "Updating probabilities for query ID: " << query_id
+ << " and priority level: " << priority_level
+ << " that has no queries";
+ return;
+ } else if (execution_stats_[priority_level].size() == 1u) {
+ DCHECK(current_probabilities_[priority_level] != nullptr);
+ DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
+ // As we want the probability of the lone query in this priority level as
+ // 1, we set the numerator same as denominator.
+ const std::size_t numerator =
+ current_probabilities_[priority_level]->getDenominator();
+ current_probabilities_[priority_level]->addOrUpdateObject(query_id,
+ numerator);
+ return;
+ }
+ // Else, there are more than one queries for the given priority level.
+ std::unordered_map<std::size_t, std::size_t>
+ mean_workorders_per_query =
+ getMeanWorkOrderTimesForQueriesInPriorityLevel(priority_level);
+ const float denominator = calculateDenominator(mean_workorders_per_query);
+ if (denominator != 0) {
+ // Update the numerator for the given query and denominator for all the
+ // queries.
+ DCHECK(mean_workorders_per_query.find(query_id) !=
+ mean_workorders_per_query.end());
+ current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+ query_id, mean_workorders_per_query[query_id], denominator);
+ } else {
+ // At least one of the queries has predicted time for next work order as 0.
+ // In such a case, we don't update the probabilities and continue to use
+ // the older probabilities.
+ }
+}
+
+void Learner::updateProbabilitiesOfAllPriorityLevels(
+ const std::size_t priority_level) {
+ if (!hasFeedbackFromAllPriorityLevels() ||
+ has_feedback_from_all_queries_.empty()) {
+ // Either we don't have enough feedback messages from all the priority
+ // levels OR there are no active queries in the system.
+ return;
+ }
+ // Compute the predicted work order execution times for all the level.
+ std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
+ std::size_t sum_active_priorities = 0;
+ for (auto priority_iter : has_feedback_from_all_queries_) {
+ std::size_t total_time_curr_level = 0;
+ const std::size_t curr_priority_level = priority_iter.first;
+ sum_active_priorities += curr_priority_level;
+ // For each query, find its predicted work order execution time.
+ const std::unordered_map<std::size_t, std::size_t>
+ mean_workorders_all_queries_curr_level =
+ getMeanWorkOrderTimesForQueriesInPriorityLevel(
+ curr_priority_level);
+ for (auto mean_workorder_entry : mean_workorders_all_queries_curr_level) {
+ total_time_curr_level += mean_workorder_entry.second;
+ }
+ const std::size_t num_queries_in_priority_level =
+ execution_stats_[priority_level].size();
+ DCHECK_GT(num_queries_in_priority_level, 0u);
+ predicted_time_for_level[curr_priority_level] =
+ total_time_curr_level / num_queries_in_priority_level;
+ }
+ DCHECK_GT(sum_active_priorities, 0u);
+ // Now compute the allowable number of work orders for each priority level
+ // that can be executed given a unit total time.
+ // Key = priority level, value = the # of WO mentioned above.
+ std::unordered_map<std::size_t, float> num_workorders_for_level;
+ float total_num_workorders = 0;
+ for (auto predicted_time_iter : predicted_time_for_level) {
+ const std::size_t curr_priority_level = predicted_time_iter.first;
+ const std::size_t num_workorders_for_curr_level =
+ (predicted_time_iter.second == 0)
+ ? 0
+ : static_cast<float>(curr_priority_level) /
+ sum_active_priorities /
+ static_cast<float>(predicted_time_iter.second);
+ num_workorders_for_level[curr_priority_level] = num_workorders_for_curr_level;
+ total_num_workorders += num_workorders_for_curr_level;
+ }
+ if (total_num_workorders == 0) {
+ // No priority level can be selected at this point.
+ return;
+ }
+ // Finally, compute the probabilities.
+ std::vector<std::size_t> priority_levels;
+ std::vector<float> numerators;
+ for (auto num_workorders_iter : num_workorders_for_level) {
+ priority_levels.emplace_back(num_workorders_iter.first);
+ numerators.emplace_back(num_workorders_iter.second);
+ }
+ probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+ priority_levels, numerators, total_num_workorders);
+}
+
+void Learner::initializeDefaultProbabilitiesForAllQueries() {
+ for (auto queries_same_priority_level_iter = execution_stats_.begin();
+ queries_same_priority_level_iter != execution_stats_.end();
+ ++queries_same_priority_level_iter) {
+ std::vector<std::size_t> query_ids;
+ const auto &queries_vector = queries_same_priority_level_iter->second;
+ DCHECK(!queries_vector.empty());
+ for (auto query_iter = queries_vector.cbegin();
+ query_iter != queries_vector.cend();
+ ++query_iter) {
+ query_ids.emplace_back(query_iter->first);
+ }
+ // Numerator for each query is 1.0
+ // The common denominator is number of queries with the given priority level.
+ std::vector<float> numerators(query_ids.size(), 1.0);
+ // Reset the probability store for this level.
+ const std::size_t curr_priority_level =
+ queries_same_priority_level_iter->first;
+ default_probabilities_[curr_priority_level].reset(new ProbabilityStore());
+ default_probabilities_[curr_priority_level]
+ ->addOrUpdateObjectsNewDenominator(
+ query_ids, numerators, query_ids.size());
+ }
+}
+
+void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
+ probabilities_of_priority_levels_.reset(new ProbabilityStore());
+ std::vector<std::size_t> priority_levels;
+ std::vector<float> numerators;
+ float sum_priority_levels = 0;
+ for (auto priority_iter = execution_stats_.cbegin();
+ priority_iter != execution_stats_.cend();
+ ++priority_iter) {
+ sum_priority_levels += priority_iter->second.size();
+ priority_levels.emplace_back(priority_iter->first);
+ numerators.emplace_back(priority_iter->first);
+ }
+ probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+ priority_levels, numerators, sum_priority_levels);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fbda4ac/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
new file mode 100644
index 0000000..64120a7
--- /dev/null
+++ b/query_execution/Learner.hpp
@@ -0,0 +1,352 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_int32(max_past_entries_learner,
+ 10,
+ "The maximum number of past WorkOrder execution statistics"
+ " entries for a query");
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+class Learner {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Learner() {
+ }
+
+ void addCompletionFeedback(
+ const serialization::NormalWorkOrderCompletionMessage
+ &workorder_completion_proto);
+
+ void addQuery(const QueryHandle &query_handle) {
+ initializePriorityLevelIfNotPresent(query_handle.query_priority());
+ initializeQuery(query_handle);
+ relearn();
+ }
+
+ void removeQuery(const std::size_t query_id) {
+ // Find the iterator to the query in execution_stats_.
+ DCHECK(isQueryPresent(query_id));
+ const std::size_t priority_level = getQueryPriority(query_id);
+ auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
+ execution_stats_[priority_level].erase(stats_iter_mutable);
+ current_probabilities_[priority_level]->removeObject(query_id);
+ checkAndRemovePriorityLevel(priority_level);
+ relearn();
+ }
+
+ void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+ ExecutionStats *stats = getExecutionStats(query_id);
+ DCHECK(stats != nullptr);
+ stats->removeOperator(operator_id);
+ }
+
+ void relearn() {
+ if (hasActiveQueries()) {
+ initializeDefaultProbabilitiesForAllQueries();
+ initializeDefaultProbabilitiesForPriorityLevels();
+ }
+ }
+
+ void updateProbabilitiesForQueriesInPriorityLevel(
+ const std::size_t priority_level, const std::size_t query_id);
+
+ // TODO(harshad) - Cache internal results from previous invocation of this
+ // function and reuse them. There's a lot of redundancy in computations
+ // at this point.
+ void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
+
+ private:
+ /**
+ * @brief Initialize the default probabilities for the queries.
+ **/
+ void initializeDefaultProbabilitiesForAllQueries();
+
+ /**
+ * @brief Initialize the default probabilities for the priority levels.
+ **/
+ void initializeDefaultProbabilitiesForPriorityLevels();
+
+ /**
+ * @brief Initialize the data structures for a given priority level, if none
+ * exist. If there are already data structures for the given priority
+ * level, do nothing.
+ **/
+ inline void initializePriorityLevelIfNotPresent(
+ const std::size_t priority_level) {
+ if (isPriorityLevelPresent(priority_level)) {
+ current_probabilities_[priority_level].reset(new ProbabilityStore());
+ // Calculate the default probability for the priority level here and use
+ // it instead of 0.5 here.
+ // TODO(harshad) - Correct this.
+ probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+ execution_stats_[priority_level];
+ }
+ }
+
+ /**
+ * @brief First check if the priority level needs to be removed from the local
+ * data structures and remove if needed.
+ *
+ * @param priority_level The priority level.
+ **/
+ inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (execution_stats_[priority_level].empty()) {
+ execution_stats_.erase(priority_level);
+ current_probabilities_.erase(priority_level);
+ probabilities_of_priority_levels_->removeObject(priority_level);
+ has_feedback_from_all_queries_.erase(priority_level);
+ }
+ }
+
+ /**
+ * @brief Check if the Learner has presence of the given priority level.
+ **/
+ inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
+ DCHECK_EQ((current_probabilities_.find(priority_level) ==
+ current_probabilities_.end()),
+ execution_stats_.find(priority_level) == execution_stats_.end());
+ return (execution_stats_.find(priority_level) != execution_stats_.end());
+ }
+
+ /**
+ * @brief Check if the query is present.
+ **/
+ inline bool isQueryPresent(const std::size_t query_id) const {
+ return query_id_to_priority_lookup_.find(query_id) !=
+ query_id_to_priority_lookup_.end();
+ }
+
+ /**
+ * @brief Initialize all the data structures for a new query.
+ *
+ * @param query_handle The query handle for the new query.
+ **/
+ void initializeQuery(const QueryHandle &query_handle) {
+ const std::size_t priority_level = query_handle.query_priority();
+ const std::size_t query_id = query_handle.query_id();
+ DCHECK(isPriorityLevelPresent(priority_level));
+ query_id_to_priority_lookup_[query_id] = priority_level;
+ execution_stats_[priority_level].emplace_back(
+ query_id,
+ std::unique_ptr<ExecutionStats>(
+ new ExecutionStats(FLAGS_max_past_entries_learner)));
+ // As we are initializing the query, we obviously haven't gotten any
+ // feedback message for this query. Hence mark the following field as false.
+ has_feedback_from_all_queries_[priority_level] = false;
+ }
+
+ /**
+ * @brief Get the execution stats object for the given query.
+ *
+ * @return A pointer to the ExecutionStats for the query. If not present,
+ * returns NULL.
+ **/
+ inline ExecutionStats* getExecutionStats(const std::size_t query_id) {
+ if (isQueryPresent(query_id)) {
+ const auto stats_iter = getExecutionStatsIterMutable(query_id);
+ DCHECK(stats_iter !=
+ std::end(execution_stats_[getQueryPriority(query_id)]));
+ return stats_iter->second.get();
+ }
+ return nullptr;
+ }
+
+ /**
+ * @brief This function works well when the query and priority level exists
+ * in the data structures.
+ *
+ **/
+ inline std::vector<
+ std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
+ getExecutionStatsIterMutable(const std::size_t query_id) {
+ const std::size_t priority_level = getQueryPriority(query_id);
+ const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+ &stats_vector = execution_stats_[priority_level];
+ // The following line uses std::find_if to reach to the desired element
+ // in the stats_vector.
+ auto stats_iter = std::find_if(
+ stats_vector.begin(),
+ stats_vector.end(),
+ [&query_id](
+ const std::pair<std::size_t, std::unique_ptr<ExecutionStats>> &p) {
+ return p.first == query_id;
+ });
+ return stats_iter;
+ }
+
+ inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+ const auto it = query_id_to_priority_lookup_.find(query_id);
+ DCHECK(it != query_id_to_priority_lookup_.end());
+ return it->second;
+ }
+
+ /**
+ * @brief Check if we have received at least one feedback message from all the
+ * queries in the given priority level.
+ **/
+ inline bool hasFeedbackFromAllQueriesInPriorityLevel(
+ const std::size_t priority_level) const {
+ const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+ &stats_vector = execution_stats_.at(priority_level);
+ for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+ DCHECK(stats_vector[i].second != nullptr);
+ if (!stats_vector[i].second->hasStats()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ inline void updateFeedbackFromQueriesInPriorityLevel(
+ const std::size_t priority_level) {
+ const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+ &stats_vector = execution_stats_.at(priority_level);
+ for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+ DCHECK(stats_vector[i].second != nullptr);
+ if (!stats_vector[i].second->hasStats()) {
+ // At least one query has no statistics so far.
+ return;
+ }
+ }
+ // All the queries have at least one execution statistic.
+ has_feedback_from_all_queries_[priority_level] = true;
+ }
+
+ inline const std::size_t hasActiveQueries() const {
+ return !query_id_to_priority_lookup_.empty();
+ }
+
+ /**
+ * @brief Get the mean work order execution times for all the queries in
+ * a given priority level.
+ *
+ * @param priority_level The priority level.
+ *
+ * @return An unordered_map in which: Key = query ID.
+ * Value = Mean time per work order for that query.
+ **/
+ inline std::unordered_map<std::size_t, std::size_t>
+ getMeanWorkOrderTimesForQueriesInPriorityLevel(
+ const std::size_t priority_level) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ std::unordered_map<std::size_t, std::size_t> result;
+ for (auto it = execution_stats_[priority_level].begin();
+ it != execution_stats_[priority_level].end();
+ ++it) {
+ DCHECK(it->second.get() != nullptr);
+ auto query_stats = it->second->getCurrentStats();
+ result[it->first] =
+ query_stats.second == 0 ? 0 : query_stats.first / query_stats.second;
+ }
+ return result;
+ }
+
+ /**
+ * @param mean_workorder_per_query A vector of pairs in which:
+ * 1st element is mean time per work order
+ * 2nd element is the query ID.
+ *
+ * @note If any query has mean work order time as 0, we return 0 as the
+ * denominator.
+ *
+ * @return The denominator to be used for probability calculations.
+ **/
+ inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
+ &mean_workorder_per_query) const {
+ float denominator = 0;
+ for (const auto &element : mean_workorder_per_query) {
+ if (element.second != 0) {
+ denominator += 1/static_cast<float>(element.second);
+ } else {
+ return 0;
+ }
+ }
+ return denominator;
+ }
+
+ inline bool hasFeedbackFromAllPriorityLevels() const {
+ for (auto feedback : has_feedback_from_all_queries_) {
+ if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Key = Priority level, value = A vector of pairs.
+ // Each pair:
+ // 1st element: Query ID.
+ // 2nd Element: Execution statistics for the query.
+ std::unordered_map<
+ std::size_t,
+ std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>>
+ execution_stats_;
+
+ // Key = query ID, value = priority level for the query ID.
+ std::unordered_map<std::size_t, std::size_t> query_id_to_priority_lookup_;
+
+ // Key = priority level, value = ProbabilityStore for the queries belonging to
+ // that priority level.
+ std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+ current_probabilities_;
+
+ // Key = priority level, value = ProbabilityStore for the queries belonging to
+ // that priority level.
+ std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+ default_probabilities_;
+
+ // ProbabilityStrore for probabilities mapped to the priority levels.
+ std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
+
+ // Key = priority level. Value = A boolean that indicates if we have received
+ // feedback from all the queries in the given priority level.
+ // TODO(harshad) - Invalidate the cache whenever needed.
+ std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+
+ DISALLOW_COPY_AND_ASSIGN(Learner);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fbda4ac/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index db7206b..ff734ca 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/Learner.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryManager.hpp"
@@ -42,6 +43,7 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" the workers.");
bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+ Learner learner;
if (admitted_queries_.size() < kMaxConcurrentQueries) {
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fbda4ac/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 8343d24..d31caa6 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -22,6 +22,7 @@
#include <cstddef>
#include <random>
#include <unordered_map>
+#include <utility>
#include <vector>
#include "utility/Macros.hpp"
@@ -40,7 +41,7 @@ class ProbabilityStore {
* @brief Constructor.
**/
ProbabilityStore()
- : mt_(std::random_device()()) {}
+ : common_denominator_(1.0), mt_(std::random_device()()) {}
/**
* @brief Get the number of objects in the store.
@@ -50,6 +51,10 @@ class ProbabilityStore {
return individual_probabilities_.size();
}
+ inline const std::size_t getDenominator() const {
+ return common_denominator_;
+ }
+
/**
* @brief Add individual (not cumulative) probability for a given object.
*
@@ -59,16 +64,48 @@ class ProbabilityStore {
* @note This function may override previously written probability values.
*
* @param property The property of the given object.
- * @param individual_probability The individual (not cumulative) probability
- * of the given object.
+ * @param numerator The numerator for the given object.
**/
- void addProbability(const std::size_t property,
- const float individual_probability) {
- individual_probabilities_[property] = individual_probability;
+ void addOrUpdateObject(const std::size_t property,
+ const float numerator) {
+ DCHECK_LE(numerator, common_denominator_);
+ // We should have the correct individual probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we rely on the probabilities for all the objects in
+ // updateCumulativeProbabilities().
+ individual_probabilities_[property] =
+ std::make_pair(numerator, numerator / common_denominator_);
updateCumulativeProbabilities();
}
/**
+ * @brief Add individual (not cumulative) probability for a given object with
+ * updated denominator.
+ *
+ * @note This function leaves the cumulative probabilities in a consistent
+ * state. An alternative lazy implementation should be written if cost
+ * of calculating cumulative probabilities is high.
+ * @note This function may override previously written probability values.
+ *
+ * @param property The property of the given object.
+ * @param numerator The numerator for the given object.
+ * @param new_denominator The updated denominator for the store.
+ **/
+ void addOrUpdateObjectNewDenominator(const std::size_t property,
+ const float numerator,
+ const float new_denominator) {
+ CHECK_GT(new_denominator, 0u);
+ DCHECK_LE(numerator, new_denominator);
+ common_denominator_ = new_denominator;
+ // It is alright to not have the correct probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we compute the probabilities for all the objects in
+ // updateProbabilitiesNewDenominator().
+ individual_probabilities_[property] = std::make_pair(numerator, 0.0);
+ updateProbabilitiesNewDenominator();
+ }
+
+ /**
* @brief Add individual (not cumulative) probabilities for given objects.
*
* @note This function leaves the cumulative probabilities in a consistent
@@ -77,30 +114,40 @@ class ProbabilityStore {
* @note This function may override previously written probability values.
*
* @param properties A vector of properties to be added.
- * @param individual_probabilities The individual (not cumulative)
- * probabilities of the given objects.
+ * @param numerators The numerators of the given objects.
**/
- void addProbabilities(const std::vector<std::size_t> &properties,
- const std::vector<float> &individual_probabilities) {
- DCHECK_EQ(properties.size(), individual_probabilities.size());
+ void addOrUpdateObjects(const std::vector<std::size_t> &properties,
+ const std::vector<float> &numerators) {
+ DCHECK_EQ(properties.size(), numerators.size());
for (std::size_t i = 0; i < properties.size(); ++i) {
- individual_probabilities_[properties[i]] = individual_probabilities[i];
+ DCHECK_LE(numerators[i], common_denominator_);
+ // We should have the correct individual probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we rely on the probabilities for all the objects in
+ // updateCumulativeProbabilities().
+ individual_probabilities_[properties[i]] =
+ std::make_pair(numerators[i], numerators[i] / common_denominator_);
}
updateCumulativeProbabilities();
}
- /**
- * @brief Update the probability of a given object to a new value.
- *
- * @param property The property of the object.
- * @param new_individual_probability The new probability to be set.
- **/
- void updateProbability(const std::size_t property,
- const float new_individual_probability) {
- auto it = individual_probabilities_.find(property);
- DCHECK(it != individual_probabilities_.end());
- it->second = new_individual_probability;
- updateCumulativeProbabilities();
+ void addOrUpdateObjectsNewDenominator(
+ const std::vector<std::size_t> &properties,
+ const std::vector<float> &numerators,
+ const float new_denominator) {
+ CHECK_GT(new_denominator, 0u);
+ DCHECK_EQ(properties.size(), numerators.size());
+ common_denominator_ = new_denominator;
+ for (std::size_t i = 0; i < properties.size(); ++i) {
+ DCHECK_LE(numerators[i], common_denominator_);
+ // It is alright to not have the correct probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we compute the probabilities for all the objects in
+ // updateProbabilitiesNewDenominator().
+ individual_probabilities_[properties[i]] =
+ std::make_pair(numerators[i], 0.0);
+ }
+ updateProbabilitiesNewDenominator();
}
/**
@@ -109,10 +156,24 @@ class ProbabilityStore {
* @param property The property of the object to be removed.
**/
void removeObject(const std::size_t property) {
- auto it = individual_probabilities_.find(property);
- DCHECK(it != individual_probabilities_.end());
- individual_probabilities_.erase(it);
- updateCumulativeProbabilities();
+ auto individual_it = individual_probabilities_.find(property);
+ DCHECK(individual_it != individual_probabilities_.end());
+ individual_probabilities_.erase(individual_it);
+ if (!individual_probabilities_.empty()) {
+ float new_denominator = 0;
+ for (auto it = individual_probabilities_.begin();
+ it != individual_probabilities_.end();
+ ++it) {
+ new_denominator += it->second.first;
+ }
+ CHECK_GT(new_denominator, 0);
+ common_denominator_ = new_denominator;
+ updateCumulativeProbabilities();
+ } else {
+ // In order to keep the store consistent, we should keep the sizes of
+ // individual_probabilities_ and cumulative_probabilities_ the same.
+ cumulative_probabilities_.clear();
+ }
}
/**
@@ -123,7 +184,7 @@ class ProbabilityStore {
const float getIndividualProbability(const std::size_t property) const {
const auto it = individual_probabilities_.find(property);
DCHECK(it != individual_probabilities_.end());
- return it->second;
+ return it->second.second;
}
/**
@@ -141,13 +202,13 @@ class ProbabilityStore {
return;
}
float cumulative_probability = 0;
- for (const auto property_probability_pair : individual_probabilities_) {
- cumulative_probabilities_.emplace_back(property_probability_pair.first,
+ for (const auto p : individual_probabilities_) {
+ cumulative_probabilities_.emplace_back(p.first,
cumulative_probability);
- cumulative_probability += property_probability_pair.second;
+ cumulative_probability += p.second.second;
}
- // Adjust the last cumulative probability manually to 1.0, so that floating
- // addition related rounding issues are ignored.
+ // Adjust the last cumulative probability manually to 1.0, so that
+ // floating addition related rounding issues are ignored.
cumulative_probabilities_.back().updateProbability(1.0);
}
@@ -208,9 +269,26 @@ class ProbabilityStore {
return it->property_;
}
- std::unordered_map<std::size_t, float> individual_probabilities_;
+ inline void updateProbabilitiesNewDenominator() {
+ // First update the individual probabilities.
+ for (auto it = individual_probabilities_.begin();
+ it != individual_probabilities_.end();
+ ++it) {
+ DCHECK_LE(it->second.first, common_denominator_);
+ it->second.second = it->second.first / common_denominator_;
+ }
+ updateCumulativeProbabilities();
+ }
+
+ // Key = property of the object.
+ // Value = A pair ...
+ // 1st element: Numerator of the object.
+ // 2nd element: Individual probability of the object.
+ std::unordered_map<std::size_t, std::pair<float, float>> individual_probabilities_;
std::vector<ProbabilityInfo> cumulative_probabilities_;
+ float common_denominator_;
+
std::mt19937_64 mt_;
DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fbda4ac/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
index e624557..dcec1e5 100644
--- a/query_execution/tests/ProbabilityStore_unittest.cpp
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -28,14 +28,15 @@ TEST(ProbabilityStoreTest, CountTest) {
ProbabilityStore store;
EXPECT_EQ(0u, store.getNumObjects());
const std::size_t kProperty = 0;
- store.addProbability(kProperty, 0.5);
+ store.addOrUpdateObject(kProperty, 1);
EXPECT_EQ(1u, store.getNumObjects());
store.removeObject(kProperty);
EXPECT_EQ(0u, store.getNumObjects());
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
- store.addProbabilities(objects, probabilities);
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
EXPECT_EQ(objects.size(), store.getNumObjects());
}
@@ -43,11 +44,12 @@ TEST(ProbabilityStoreTest, CountTest) {
TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
ProbabilityStore store;
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
- store.addProbabilities(objects, probabilities);
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
- EXPECT_EQ(probabilities[object_num],
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
store.getIndividualProbability(objects[object_num]));
}
}
@@ -55,8 +57,9 @@ TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
ProbabilityStore store;
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
- store.addProbabilities(objects, probabilities);
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
const std::size_t kNumTrials = 10;
while (!objects.empty()) {
@@ -72,4 +75,30 @@ TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
}
}
+TEST(ProbabilityStoreTest, RemoveObjectTest) {
+ ProbabilityStore store;
+ std::vector<std::size_t> objects {3, 5, 7, 9};
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+ for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+ store.getIndividualProbability(objects[object_num]));
+ }
+
+ // Remove last object "9", with numerator 5.
+ store.removeObject(objects.back());
+ objects.pop_back();
+ numerators.pop_back();
+ const float expected_new_denominator =
+ std::accumulate(numerators.begin(), numerators.end(), 0);
+
+ EXPECT_EQ(expected_new_denominator, store.getDenominator());
+ for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+ store.getIndividualProbability(objects[object_num]));
+ }
+}
+
} // namespace quickstep