You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/28 14:50:41 UTC

[08/14] incubator-quickstep git commit: Created Learner class.

Created Learner class.

- Learner keeps track of statistics of concurrent queries
- It maintains the probabilities for individual queries as well as the
  priority levels in the system.
- Changes in ProbabilityStore class including addition of numerator,
  denominator and more unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ee5c63fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ee5c63fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ee5c63fb

Branch: refs/heads/scheduler++
Commit: ee5c63fb0a5b0f2115dc51248bca86b44139c79a
Parents: 12142d9
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 15:54:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jun 28 09:50:12 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |  11 +
 query_execution/ExecutionStats.hpp              |  18 +
 query_execution/Learner.cpp                     | 195 ++++++++++
 query_execution/Learner.hpp                     | 352 +++++++++++++++++++
 query_execution/PolicyEnforcer.cpp              |   2 +
 query_execution/ProbabilityStore.hpp            | 148 ++++++--
 .../tests/ProbabilityStore_unittest.cpp         |  45 ++-
 7 files changed, 728 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee5c63fb/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 18ae0da..cb0f815 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,7 @@ add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitR
 add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_Learner Learner.cpp Learner.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
 add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -93,11 +94,20 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
                       quickstep_threading_Thread
                       quickstep_utility_Macros
                       tmb)
+target_link_libraries(quickstep_queryexecution_Learner
+                      ${GFLAGS_LIB_NAME}
+                      glog
+                      quickstep_queryexecution_ExecutionStats
+                      quickstep_queryexecution_ProbabilityStore
+                      quickstep_queryexecution_QueryExecutionMessages_proto                      
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       ${GFLAGS_LIB_NAME}
                       glog
                       quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_Learner
                       quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -212,6 +222,7 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_ExecutionStats
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_ForemanLite
+                      quickstep_queryexecution_Learner
                       quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee5c63fb/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index f28f367..769c7a4 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,6 +58,20 @@ class ExecutionStats {
   }
 
   /**
+   * @brief Check if there are any stats present.
+   **/
+  inline bool hasStats() const {
+    for (auto it = active_operators_.begin();
+         it != active_operators_.end();
+         ++it) {
+      if (!it->second->hasStatsForOperator()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * @brief Get the current stats.
    *
    * @note This function updates the cache, hence it can't be const. We are lazy
@@ -145,6 +159,10 @@ class ExecutionStats {
       DCHECK_LE(times_.size(), max_entries_);
     }
 
+    inline bool hasStatsForOperator() const {
+      return !times_.empty();
+    }
+
    private:
     const std::size_t max_entries_;
     std::deque<std::uint64_t> times_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee5c63fb/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
new file mode 100644
index 0000000..72c68f0
--- /dev/null
+++ b/query_execution/Learner.cpp
@@ -0,0 +1,195 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/Learner.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void Learner::addCompletionFeedback(
+    const serialization::NormalWorkOrderCompletionMessage
+        &workorder_completion_proto) {
+  const std::size_t query_id = workorder_completion_proto.query_id();
+  DCHECK(isQueryPresent(query_id));
+  const std::size_t priority_level = getQueryPriority(query_id);
+  ExecutionStats *execution_stats = getExecutionStats(query_id);
+  DCHECK(execution_stats != nullptr);
+  execution_stats->addEntry(
+      workorder_completion_proto.execution_time_in_microseconds(),
+      workorder_completion_proto.operator_index());
+
+  // updateProbability();
+  if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+    updateFeedbackFromQueriesInPriorityLevel(priority_level);
+  }
+}
+
+void Learner::updateProbabilitiesForQueriesInPriorityLevel(
+    const std::size_t priority_level, const std::size_t query_id) {
+  DCHECK(isPriorityLevelPresent(priority_level));
+  if (execution_stats_[priority_level].empty()) {
+    LOG(INFO) << "Updating probabilities for query ID: " << query_id
+              << " and priority level: " << priority_level
+              << " that has no queries";
+    return;
+  } else if (execution_stats_[priority_level].size() == 1u) {
+    DCHECK(current_probabilities_[priority_level] != nullptr);
+    DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
+    // As we want the probability of the lone query in this priority level as
+    // 1, we set the numerator same as denominator.
+    const std::size_t numerator =
+        current_probabilities_[priority_level]->getDenominator();
+    current_probabilities_[priority_level]->addOrUpdateObject(query_id,
+                                                              numerator);
+    return;
+  }
+  // Else, there are more than one queries for the given priority level.
+  std::unordered_map<std::size_t, std::size_t>
+      mean_workorders_per_query =
+          getMeanWorkOrderTimesForQueriesInPriorityLevel(priority_level);
+  const float denominator = calculateDenominator(mean_workorders_per_query);
+  if (denominator != 0) {
+    // Update the numerator for the given query and denominator for all the
+    // queries.
+    DCHECK(mean_workorders_per_query.find(query_id) !=
+           mean_workorders_per_query.end());
+    current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+        query_id, mean_workorders_per_query[query_id], denominator);
+  } else {
+    // At least one of the queries has predicted time for next work order as 0.
+    // In such a case, we don't update the probabilities and continue to use
+    // the older probabilities.
+  }
+}
+
+void Learner::updateProbabilitiesOfAllPriorityLevels(
+    const std::size_t priority_level) {
+  if (!hasFeedbackFromAllPriorityLevels() ||
+      has_feedback_from_all_queries_.empty()) {
+    // Either we don't have enough feedback messages from all the priority
+    // levels OR there are no active queries in the system.
+    return;
+  }
+  // Compute the predicted work order execution times for all the level.
+  std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
+  std::size_t sum_active_priorities = 0;
+  for (auto priority_iter : has_feedback_from_all_queries_) {
+    std::size_t total_time_curr_level = 0;
+    const std::size_t curr_priority_level = priority_iter.first;
+    sum_active_priorities += curr_priority_level;
+    // For each query, find its predicted work order execution time.
+    const std::unordered_map<std::size_t, std::size_t>
+        mean_workorders_all_queries_curr_level =
+            getMeanWorkOrderTimesForQueriesInPriorityLevel(
+                curr_priority_level);
+    for (auto mean_workorder_entry : mean_workorders_all_queries_curr_level) {
+      total_time_curr_level += mean_workorder_entry.second;
+    }
+    const std::size_t num_queries_in_priority_level =
+        execution_stats_[priority_level].size();
+    DCHECK_GT(num_queries_in_priority_level, 0u);
+    predicted_time_for_level[curr_priority_level] =
+        total_time_curr_level / num_queries_in_priority_level;
+  }
+  DCHECK_GT(sum_active_priorities, 0u);
+  // Now compute the allowable number of work orders for each priority level
+  // that can be executed given a unit total time.
+  // Key = priority level, value = the # of WO mentioned above.
+  std::unordered_map<std::size_t, float> num_workorders_for_level;
+  float total_num_workorders = 0;
+  for (auto predicted_time_iter : predicted_time_for_level) {
+    const std::size_t curr_priority_level = predicted_time_iter.first;
+    const std::size_t num_workorders_for_curr_level =
+        (predicted_time_iter.second == 0)
+            ? 0
+            : static_cast<float>(curr_priority_level) /
+                  sum_active_priorities /
+                  static_cast<float>(predicted_time_iter.second);
+    num_workorders_for_level[curr_priority_level] = num_workorders_for_curr_level;
+    total_num_workorders += num_workorders_for_curr_level;
+  }
+  if (total_num_workorders == 0) {
+    // No priority level can be selected at this point.
+    return;
+  }
+  // Finally, compute the probabilities.
+  std::vector<std::size_t> priority_levels;
+  std::vector<float> numerators;
+  for (auto num_workorders_iter : num_workorders_for_level) {
+    priority_levels.emplace_back(num_workorders_iter.first);
+    numerators.emplace_back(num_workorders_iter.second);
+  }
+  probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+      priority_levels, numerators, total_num_workorders);
+}
+
+void Learner::initializeDefaultProbabilitiesForAllQueries() {
+  for (auto queries_same_priority_level_iter = execution_stats_.begin();
+       queries_same_priority_level_iter != execution_stats_.end();
+       ++queries_same_priority_level_iter) {
+    std::vector<std::size_t> query_ids;
+    const auto &queries_vector = queries_same_priority_level_iter->second;
+    DCHECK(!queries_vector.empty());
+    for (auto query_iter = queries_vector.cbegin();
+         query_iter != queries_vector.cend();
+         ++query_iter) {
+      query_ids.emplace_back(query_iter->first);
+    }
+    // Numerator for each query is 1.0
+    // The common denominator is number of queries with the given priority level.
+    std::vector<float> numerators(query_ids.size(), 1.0);
+    // Reset the probability store for this level.
+    const std::size_t curr_priority_level =
+        queries_same_priority_level_iter->first;
+    default_probabilities_[curr_priority_level].reset(new ProbabilityStore());
+    default_probabilities_[curr_priority_level]
+        ->addOrUpdateObjectsNewDenominator(
+            query_ids, numerators, query_ids.size());
+  }
+}
+
+void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
+  probabilities_of_priority_levels_.reset(new ProbabilityStore());
+  std::vector<std::size_t> priority_levels;
+  std::vector<float> numerators;
+  float sum_priority_levels = 0;
+  for (auto priority_iter = execution_stats_.cbegin();
+       priority_iter != execution_stats_.cend();
+       ++priority_iter) {
+    sum_priority_levels += priority_iter->second.size();
+    priority_levels.emplace_back(priority_iter->first);
+    numerators.emplace_back(priority_iter->first);
+  }
+  probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+      priority_levels, numerators, sum_priority_levels);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee5c63fb/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
new file mode 100644
index 0000000..64120a7
--- /dev/null
+++ b/query_execution/Learner.hpp
@@ -0,0 +1,352 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_int32(max_past_entries_learner,
+              10,
+              "The maximum number of past WorkOrder execution statistics"
+              " entries for a query");
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+class Learner {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Learner() {
+  }
+
+  void addCompletionFeedback(
+      const serialization::NormalWorkOrderCompletionMessage
+          &workorder_completion_proto);
+
+  void addQuery(const QueryHandle &query_handle) {
+    initializePriorityLevelIfNotPresent(query_handle.query_priority());
+    initializeQuery(query_handle);
+    relearn();
+  }
+
+  void removeQuery(const std::size_t query_id) {
+    // Find the iterator to the query in execution_stats_.
+    DCHECK(isQueryPresent(query_id));
+    const std::size_t priority_level = getQueryPriority(query_id);
+    auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
+    execution_stats_[priority_level].erase(stats_iter_mutable);
+    current_probabilities_[priority_level]->removeObject(query_id);
+    checkAndRemovePriorityLevel(priority_level);
+    relearn();
+  }
+
+  void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+    ExecutionStats *stats = getExecutionStats(query_id);
+    DCHECK(stats != nullptr);
+    stats->removeOperator(operator_id);
+  }
+
+  void relearn() {
+    if (hasActiveQueries()) {
+      initializeDefaultProbabilitiesForAllQueries();
+      initializeDefaultProbabilitiesForPriorityLevels();
+    }
+  }
+
+  void updateProbabilitiesForQueriesInPriorityLevel(
+      const std::size_t priority_level, const std::size_t query_id);
+
+  // TODO(harshad) - Cache internal results from previous invocation of this
+  // function and reuse them. There's a lot of redundancy in computations
+  // at this point.
+  void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
+
+ private:
+  /**
+   * @brief Initialize the default probabilities for the queries.
+   **/
+  void initializeDefaultProbabilitiesForAllQueries();
+
+  /**
+   * @brief Initialize the default probabilities for the priority levels.
+   **/
+  void initializeDefaultProbabilitiesForPriorityLevels();
+
+  /**
+   * @brief Initialize the data structures for a given priority level, if none
+   *        exist. If there are already data structures for the given priority
+   *        level, do nothing.
+   **/
+  inline void initializePriorityLevelIfNotPresent(
+      const std::size_t priority_level) {
+    if (isPriorityLevelPresent(priority_level)) {
+      current_probabilities_[priority_level].reset(new ProbabilityStore());
+      // Calculate the default probability for the priority level here and use
+      // it instead of 0.5 here.
+      // TODO(harshad) - Correct this.
+      probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+      execution_stats_[priority_level];
+    }
+  }
+
+  /**
+   * @brief First check if the priority level needs to be removed from the local
+   *        data structures and remove if needed.
+   *
+   * @param priority_level The priority level.
+   **/
+  inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
+    DCHECK(isPriorityLevelPresent(priority_level));
+    if (execution_stats_[priority_level].empty()) {
+      execution_stats_.erase(priority_level);
+      current_probabilities_.erase(priority_level);
+      probabilities_of_priority_levels_->removeObject(priority_level);
+      has_feedback_from_all_queries_.erase(priority_level);
+    }
+  }
+
+  /**
+   * @brief Check if the Learner has presence of the given priority level.
+   **/
+  inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
+    DCHECK_EQ((current_probabilities_.find(priority_level) ==
+               current_probabilities_.end()),
+              execution_stats_.find(priority_level) == execution_stats_.end());
+    return (execution_stats_.find(priority_level) != execution_stats_.end());
+  }
+
+  /**
+   * @brief Check if the query is present.
+   **/
+  inline bool isQueryPresent(const std::size_t query_id) const {
+    return query_id_to_priority_lookup_.find(query_id) !=
+           query_id_to_priority_lookup_.end();
+  }
+
+  /**
+   * @brief Initialize all the data structures for a new query.
+   *
+   * @param query_handle The query handle for the new query.
+   **/
+  void initializeQuery(const QueryHandle &query_handle) {
+    const std::size_t priority_level = query_handle.query_priority();
+    const std::size_t query_id = query_handle.query_id();
+    DCHECK(isPriorityLevelPresent(priority_level));
+    query_id_to_priority_lookup_[query_id] = priority_level;
+    execution_stats_[priority_level].emplace_back(
+        query_id,
+        std::unique_ptr<ExecutionStats>(
+            new ExecutionStats(FLAGS_max_past_entries_learner)));
+    // As we are initializing the query, we obviously haven't gotten any
+    // feedback message for this query. Hence mark the following field as false.
+    has_feedback_from_all_queries_[priority_level] = false;
+  }
+
+  /**
+   * @brief Get the execution stats object for the given query.
+   *
+   * @return A pointer to the ExecutionStats for the query. If not present,
+   *         returns NULL.
+   **/
+  inline ExecutionStats* getExecutionStats(const std::size_t query_id) {
+    if (isQueryPresent(query_id)) {
+      const auto stats_iter = getExecutionStatsIterMutable(query_id);
+      DCHECK(stats_iter !=
+             std::end(execution_stats_[getQueryPriority(query_id)]));
+      return stats_iter->second.get();
+    }
+    return nullptr;
+  }
+
+  /**
+   * @brief This function works well when the query and priority level exists
+   *        in the data structures.
+   *
+   **/
+  inline std::vector<
+      std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
+      getExecutionStatsIterMutable(const std::size_t query_id) {
+    const std::size_t priority_level = getQueryPriority(query_id);
+    const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+        &stats_vector = execution_stats_[priority_level];
+    // The following line uses std::find_if to reach to the desired element
+    // in the stats_vector.
+    auto stats_iter = std::find_if(
+        stats_vector.begin(),
+        stats_vector.end(),
+        [&query_id](
+            const std::pair<std::size_t, std::unique_ptr<ExecutionStats>> &p) {
+          return p.first == query_id;
+        });
+    return stats_iter;
+  }
+
+  inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+    const auto it = query_id_to_priority_lookup_.find(query_id);
+    DCHECK(it != query_id_to_priority_lookup_.end());
+    return it->second;
+  }
+
+  /**
+   * @brief Check if we have received at least one feedback message from all the
+   *        queries in the given priority level.
+   **/
+  inline bool hasFeedbackFromAllQueriesInPriorityLevel(
+      const std::size_t priority_level) const {
+    const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+        &stats_vector = execution_stats_.at(priority_level);
+    for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+      DCHECK(stats_vector[i].second != nullptr);
+      if (!stats_vector[i].second->hasStats()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  inline void updateFeedbackFromQueriesInPriorityLevel(
+      const std::size_t priority_level) {
+    const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+        &stats_vector = execution_stats_.at(priority_level);
+    for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+      DCHECK(stats_vector[i].second != nullptr);
+      if (!stats_vector[i].second->hasStats()) {
+        // At least one query has no statistics so far.
+        return;
+      }
+    }
+    // All the queries have at least one execution statistic.
+    has_feedback_from_all_queries_[priority_level] = true;
+  }
+
+  inline const std::size_t hasActiveQueries() const {
+    return !query_id_to_priority_lookup_.empty();
+  }
+
+  /**
+   * @brief Get the mean work order execution times for all the queries in
+   *        a given priority level.
+   *
+   * @param priority_level The priority level.
+   *
+   * @return An unordered_map in which: Key = query ID.
+   *         Value = Mean time per work order for that query.
+   **/
+  inline std::unordered_map<std::size_t, std::size_t>
+  getMeanWorkOrderTimesForQueriesInPriorityLevel(
+      const std::size_t priority_level) {
+    DCHECK(isPriorityLevelPresent(priority_level));
+    std::unordered_map<std::size_t, std::size_t> result;
+    for (auto it = execution_stats_[priority_level].begin();
+         it != execution_stats_[priority_level].end();
+         ++it) {
+      DCHECK(it->second.get() != nullptr);
+      auto query_stats = it->second->getCurrentStats();
+      result[it->first] =
+          query_stats.second == 0 ? 0 : query_stats.first / query_stats.second;
+    }
+    return result;
+  }
+
+  /**
+   * @param mean_workorder_per_query A vector of pairs in which:
+   *        1st element is mean time per work order
+   *        2nd element is the query ID.
+   *
+   * @note If any query has mean work order time as 0, we return 0 as the
+   *       denominator.
+   *
+   * @return The denominator to be used for probability calculations.
+   **/
+  inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
+                                        &mean_workorder_per_query) const {
+    float denominator = 0;
+    for (const auto &element : mean_workorder_per_query) {
+      if (element.second != 0) {
+        denominator += 1/static_cast<float>(element.second);
+      } else {
+        return 0;
+      }
+    }
+    return denominator;
+  }
+
+  inline bool hasFeedbackFromAllPriorityLevels() const {
+    for (auto feedback : has_feedback_from_all_queries_) {
+      if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Key = Priority level, value = A vector of pairs.
+  // Each pair:
+  // 1st element: Query ID.
+  // 2nd Element: Execution statistics for the query.
+  std::unordered_map<
+      std::size_t,
+      std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>>
+      execution_stats_;
+
+  // Key = query ID, value = priority level for the query ID.
+  std::unordered_map<std::size_t, std::size_t> query_id_to_priority_lookup_;
+
+  // Key = priority level, value = ProbabilityStore for the queries belonging to
+  // that priority level.
+  std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+      current_probabilities_;
+
+  // Key = priority level, value = ProbabilityStore for the queries belonging to
+  // that priority level.
+  std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+      default_probabilities_;
+
+  // ProbabilityStrore for probabilities mapped to the priority levels.
+  std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
+
+  // Key = priority level. Value = A boolean that indicates if we have received
+  // feedback from all the queries in the given priority level.
+  // TODO(harshad) - Invalidate the cache whenever needed.
+  std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+
+  DISALLOW_COPY_AND_ASSIGN(Learner);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee5c63fb/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index db7206b..ff734ca 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/Learner.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryManager.hpp"
@@ -42,6 +43,7 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
               " the workers.");
 
 bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+  Learner learner;
   if (admitted_queries_.size() < kMaxConcurrentQueries) {
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee5c63fb/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 8343d24..d31caa6 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -22,6 +22,7 @@
 #include <cstddef>
 #include <random>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "utility/Macros.hpp"
@@ -40,7 +41,7 @@ class ProbabilityStore {
    * @brief Constructor.
    **/
   ProbabilityStore()
-      : mt_(std::random_device()()) {}
+      : common_denominator_(1.0), mt_(std::random_device()()) {}
 
   /**
    * @brief Get the number of objects in the store.
@@ -50,6 +51,10 @@ class ProbabilityStore {
     return individual_probabilities_.size();
   }
 
+  inline const std::size_t getDenominator() const {
+    return common_denominator_;
+  }
+
   /**
    * @brief Add individual (not cumulative) probability for a given object.
    *
@@ -59,16 +64,48 @@ class ProbabilityStore {
    * @note This function may override previously written probability values.
    *
    * @param property The property of the given object.
-   * @param individual_probability The individual (not cumulative) probability
-   *        of the given object.
+   * @param numerator The numerator for the given object.
    **/
-  void addProbability(const std::size_t property,
-                      const float individual_probability) {
-    individual_probabilities_[property] = individual_probability;
+  void addOrUpdateObject(const std::size_t property,
+                         const float numerator) {
+    DCHECK_LE(numerator, common_denominator_);
+    // We should have the correct individual probability in
+    // individual_probabilities_ for the newly added object at this point.
+    // Because we rely on the probabilities for all the objects in
+    // updateCumulativeProbabilities().
+    individual_probabilities_[property] =
+        std::make_pair(numerator, numerator / common_denominator_);
     updateCumulativeProbabilities();
   }
 
   /**
+   * @brief Add individual (not cumulative) probability for a given object with
+   *        updated denominator.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   * @note This function may override previously written probability values.
+   *
+   * @param property The property of the given object.
+   * @param numerator The numerator for the given object.
+   * @param new_denominator The updated denominator for the store.
+   **/
+  void addOrUpdateObjectNewDenominator(const std::size_t property,
+                                       const float numerator,
+                                       const float new_denominator) {
+    CHECK_GT(new_denominator, 0u);
+    DCHECK_LE(numerator, new_denominator);
+    common_denominator_ = new_denominator;
+    // It is alright to not have the correct probability in
+    // individual_probabilities_ for the newly added object at this point.
+    // Because we compute the probabilities for all the objects in
+    // updateProbabilitiesNewDenominator().
+    individual_probabilities_[property] = std::make_pair(numerator, 0.0);
+    updateProbabilitiesNewDenominator();
+  }
+
+  /**
    * @brief Add individual (not cumulative) probabilities for given objects.
    *
    * @note This function leaves the cumulative probabilities in a consistent
@@ -77,30 +114,40 @@ class ProbabilityStore {
    * @note This function may override previously written probability values.
    *
    * @param properties A vector of properties to be added.
-   * @param individual_probabilities The individual (not cumulative)
-   *        probabilities of the given objects.
+   * @param numerators The numerators of the given objects.
    **/
-  void addProbabilities(const std::vector<std::size_t> &properties,
-                        const std::vector<float> &individual_probabilities) {
-    DCHECK_EQ(properties.size(), individual_probabilities.size());
+  void addOrUpdateObjects(const std::vector<std::size_t> &properties,
+                          const std::vector<float> &numerators) {
+    DCHECK_EQ(properties.size(), numerators.size());
     for (std::size_t i = 0; i < properties.size(); ++i) {
-      individual_probabilities_[properties[i]] = individual_probabilities[i];
+      DCHECK_LE(numerators[i], common_denominator_);
+      // We should have the correct individual probability in
+      // individual_probabilities_ for the newly added object at this point.
+      // Because we rely on the probabilities for all the objects in
+      // updateCumulativeProbabilities().
+      individual_probabilities_[properties[i]] =
+          std::make_pair(numerators[i], numerators[i] / common_denominator_);
     }
     updateCumulativeProbabilities();
   }
 
-  /**
-   * @brief Update  the probability of a given object to a new value.
-   *
-   * @param property The property of the object.
-   * @param new_individual_probability The new probability to be set.
-   **/
-  void updateProbability(const std::size_t property,
-                         const float new_individual_probability) {
-    auto it = individual_probabilities_.find(property);
-    DCHECK(it != individual_probabilities_.end());
-    it->second = new_individual_probability;
-    updateCumulativeProbabilities();
+  void addOrUpdateObjectsNewDenominator(
+      const std::vector<std::size_t> &properties,
+      const std::vector<float> &numerators,
+      const float new_denominator) {
+    CHECK_GT(new_denominator, 0u);
+    DCHECK_EQ(properties.size(), numerators.size());
+    common_denominator_ = new_denominator;
+    for (std::size_t i = 0; i < properties.size(); ++i) {
+      DCHECK_LE(numerators[i], common_denominator_);
+      // It is alright to not have the correct probability in
+      // individual_probabilities_ for the newly added object at this point.
+      // Because we compute the probabilities for all the objects in
+      // updateProbabilitiesNewDenominator().
+      individual_probabilities_[properties[i]] =
+          std::make_pair(numerators[i], 0.0);
+    }
+    updateProbabilitiesNewDenominator();
   }
 
   /**
@@ -109,10 +156,24 @@ class ProbabilityStore {
    * @param property The property of the object to be removed.
    **/
   void removeObject(const std::size_t property) {
-    auto it = individual_probabilities_.find(property);
-    DCHECK(it != individual_probabilities_.end());
-    individual_probabilities_.erase(it);
-    updateCumulativeProbabilities();
+    auto individual_it = individual_probabilities_.find(property);
+    DCHECK(individual_it != individual_probabilities_.end());
+    individual_probabilities_.erase(individual_it);
+    if (!individual_probabilities_.empty()) {
+      float new_denominator = 0;
+      for (auto it = individual_probabilities_.begin();
+           it != individual_probabilities_.end();
+           ++it) {
+        new_denominator += it->second.first;
+      }
+      CHECK_GT(new_denominator, 0);
+      common_denominator_ = new_denominator;
+      updateCumulativeProbabilities();
+    } else {
+      // In order to keep the store consistent, we should keep the sizes of
+      // individual_probabilities_ and cumulative_probabilities_ the same.
+      cumulative_probabilities_.clear();
+    }
   }
 
   /**
@@ -123,7 +184,7 @@ class ProbabilityStore {
   const float getIndividualProbability(const std::size_t property) const {
     const auto it = individual_probabilities_.find(property);
     DCHECK(it != individual_probabilities_.end());
-    return it->second;
+    return it->second.second;
   }
 
   /**
@@ -141,13 +202,13 @@ class ProbabilityStore {
       return;
     }
     float cumulative_probability = 0;
-    for (const auto property_probability_pair : individual_probabilities_) {
-      cumulative_probabilities_.emplace_back(property_probability_pair.first,
+    for (const auto p : individual_probabilities_) {
+      cumulative_probabilities_.emplace_back(p.first,
                                              cumulative_probability);
-      cumulative_probability += property_probability_pair.second;
+      cumulative_probability += p.second.second;
     }
-    // Adjust the last cumulative probability manually to 1.0, so that floating
-    // addition related rounding issues are ignored.
+    // Adjust the last cumulative probability manually to 1.0, so that
+    // floating addition related rounding issues are ignored.
     cumulative_probabilities_.back().updateProbability(1.0);
   }
 
@@ -208,9 +269,26 @@ class ProbabilityStore {
     return it->property_;
   }
 
-  std::unordered_map<std::size_t, float> individual_probabilities_;
+  inline void updateProbabilitiesNewDenominator() {
+    // First update the individual probabilities.
+    for (auto it = individual_probabilities_.begin();
+         it != individual_probabilities_.end();
+         ++it) {
+      DCHECK_LE(it->second.first, common_denominator_);
+      it->second.second = it->second.first / common_denominator_;
+    }
+    updateCumulativeProbabilities();
+  }
+
+  // Key = property of the object.
+  // Value = A pair ...
+  // 1st element: Numerator of the object.
+  // 2nd element: Individual probability of the object.
+  std::unordered_map<std::size_t, std::pair<float, float>> individual_probabilities_;
   std::vector<ProbabilityInfo> cumulative_probabilities_;
 
+  float common_denominator_;
+
   std::mt19937_64 mt_;
 
   DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee5c63fb/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
index e624557..dcec1e5 100644
--- a/query_execution/tests/ProbabilityStore_unittest.cpp
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -28,14 +28,15 @@ TEST(ProbabilityStoreTest, CountTest) {
   ProbabilityStore store;
   EXPECT_EQ(0u, store.getNumObjects());
   const std::size_t kProperty = 0;
-  store.addProbability(kProperty, 0.5);
+  store.addOrUpdateObject(kProperty, 1);
   EXPECT_EQ(1u, store.getNumObjects());
   store.removeObject(kProperty);
   EXPECT_EQ(0u, store.getNumObjects());
 
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
-  store.addProbabilities(objects, probabilities);
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   EXPECT_EQ(objects.size(), store.getNumObjects());
 }
@@ -43,11 +44,12 @@ TEST(ProbabilityStoreTest, CountTest) {
 TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
   ProbabilityStore store;
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
-  store.addProbabilities(objects, probabilities);
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
-    EXPECT_EQ(probabilities[object_num],
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
               store.getIndividualProbability(objects[object_num]));
   }
 }
@@ -55,8 +57,9 @@ TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
 TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
   ProbabilityStore store;
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
-  store.addProbabilities(objects, probabilities);
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   const std::size_t kNumTrials = 10;
   while (!objects.empty()) {
@@ -72,4 +75,30 @@ TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
   }
 }
 
+TEST(ProbabilityStoreTest, RemoveObjectTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+              store.getIndividualProbability(objects[object_num]));
+  }
+
+  // Remove last object "9", with numerator 5.
+  store.removeObject(objects.back());
+  objects.pop_back();
+  numerators.pop_back();
+  const float expected_new_denominator =
+      std::accumulate(numerators.begin(), numerators.end(), 0);
+
+  EXPECT_EQ(expected_new_denominator, store.getDenominator());
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+              store.getIndividualProbability(objects[object_num]));
+  }
+}
+
 }  // namespace quickstep