You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/18 04:41:30 UTC

[2/2] incubator-quickstep git commit: Suspend memory intensive queries when memory pressure is high.

Suspend memory intensive queries when memory pressure is high.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/14622e25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/14622e25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/14622e25

Branch: refs/heads/memory-estimate
Commit: 14622e255d8cb8f557b464d6927493ace773e910
Parents: f1e5e85
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Jul 17 23:40:20 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sun Jul 17 23:40:20 2016 -0500

----------------------------------------------------------------------
 query_execution/PriorityPolicyEnforcer.cpp | 114 ++++++++++++++++++++++--
 query_execution/PriorityPolicyEnforcer.hpp |  17 ++++
 2 files changed, 124 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/14622e25/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 37db4d0..11b9d70 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -67,7 +67,15 @@ PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_clie
 }
 
 bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
-  if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries) {
+  // Find a victim query to be suspended.
+  while (!admissionMemoryCheck(query_handle)) {
+    std::pair<int, std::size_t> victim_query = getQueryWithHighestMemoryFootprint();
+    if (victim_query.first != kInvalidQueryID) {
+      // We need to suspend this query - move it from admitted to suspended.
+      suspendQuery(victim_query.first);
+    }
+  }
+  // if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();
     if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
@@ -82,13 +90,12 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
       query_handle->setAdmissionTime();
       query_id_to_handle_[query_handle->query_id()] = query_handle;
       LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
-      LOG(INFO) << "Query " << query_handle->query_id() << " memory admissible? " << admissionMemoryCheck(query_handle);
       return true;
     } else {
       LOG(ERROR) << "Query with the same ID " << query_id << " exists";
       return false;
     }
-  } else {
+  /*} else {
     // This query will have to wait.
     std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
     if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
@@ -97,6 +104,40 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
       waiting_queries_.push(query_handle);
     }
     return false;
+  }*/
+}
+
+bool PriorityPolicyEnforcer::admitSuspendedQuery(QueryHandle *query_handle) {
+ if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      // Query with the same ID not present, ok to admit.
+      admitted_queries_[query_id].reset(
+          new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+                           catalog_database_, storage_manager_, bus_));
+      std::cout << "Admitted query with ID: " << query_handle->query_id()
+                 << " priority: " << query_handle->query_priority() << "\n";
+      priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
+      learner_->addQuery(*query_handle);
+      query_handle->setAdmissionTime();
+      query_id_to_handle_[query_handle->query_id()] = query_handle;
+      LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // Let the query be in the suspended mode.
+    // This query will have to wait.
+    /*std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
+    if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
+      // This query was not waitlisted earlier.
+      query_id_to_handle_[query_handle->query_id()] = query_handle;
+      waiting_queries_.push(query_handle);
+    }*/
+    return false;
   }
 }
 
@@ -117,10 +158,13 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
       operator_id = proto.operator_index();
       worker_directory_->decrementNumQueuedWorkOrders(
           proto.worker_thread_index());
-      if (FLAGS_dynamic_probabilities_in_learner) {
-        learner_->addCompletionFeedback<true>(proto);
-      } else {
-        learner_->addCompletionFeedback<false>(proto);
+      if (!hasQuerySuspended(query_id)) {
+        // Add completion feedback for non-suspended queries.
+        if (FLAGS_dynamic_probabilities_in_learner) {
+          learner_->addCompletionFeedback<true>(proto);
+        } else {
+          learner_->addCompletionFeedback<false>(proto);
+        }
       }
       if (profile_individual_workorders_) {
         recordTimeForWorkOrder(proto);
@@ -176,6 +220,16 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
   // NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
   if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
     removeQuery(query_id);
+    if (!suspended_queries_.empty()) {
+      // Admit a suspended query.
+      QueryHandle *suspended_query = suspended_queries_.back();
+      if (admitSuspendedQuery(suspended_query)) {
+        std::cout << "Admitting suspended query " << suspended_query->query_id() << " back\n";
+        suspended_queries_.pop_back();
+        suspended_query_managers_.erase(suspended_query->query_id());
+        return;
+      }
+    }
     if (!waiting_queries_.empty()) {
       // Admit the earliest waiting query.
       QueryHandle *new_query = waiting_queries_.front();
@@ -294,6 +348,34 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
   DLOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
 }
 
+void PriorityPolicyEnforcer::suspendQuery(const std::size_t query_id) {
+  suspended_query_managers_[query_id].reset(admitted_queries_[query_id].release());
+  suspended_queries_.push_back(query_id_to_handle_[query_id]);
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  admitted_queries_.erase(query_id);
+  // Remove the query from priority_query_ids_ structure.
+  const int query_priority = learner_->getQueryPriority(query_id);
+  DCHECK(query_priority != kInvalidPriorityLevel);
+  const std::size_t query_priority_unsigned =
+      static_cast<std::size_t>(query_priority);
+  std::vector<std::size_t> *query_ids_for_priority_level =
+      &priority_query_ids_[query_priority_unsigned];
+  query_ids_for_priority_level->erase(
+      std::remove(query_ids_for_priority_level->begin(),
+                  query_ids_for_priority_level->end(),
+                  query_id));
+  if (query_ids_for_priority_level->empty()) {
+    // No more queries for the given priority level. Remove the entry.
+    priority_query_ids_.erase(query_priority_unsigned);
+  }
+  // TODO(harshad) - Support actually evicting the memory used up by the suspended query.
+  const std::size_t estimated_memory_bytes = query_id_to_handle_[query_id]->getEstimatedMaxMemoryInBytes();
+  committed_memory_ -= estimated_memory_bytes;
+  // Remove the query from the learner.
+  learner_->removeQuery(query_id);
+  std::cout << "Suspended query: " << query_id << " with priority: " << query_priority;
+}
+
 bool PriorityPolicyEnforcer::admitQueries(
     const std::vector<QueryHandle*> &query_handles) {
   bool result = true;
@@ -401,4 +483,22 @@ const std::size_t PriorityPolicyEnforcer::getMemoryForQueryInBytes(const std::si
   return memory;
 }
 
+const std::pair<int, std::size_t> PriorityPolicyEnforcer::getQueryWithHighestMemoryFootprint() {
+  std::size_t max_memory_footprint = 0;
+  int query_id_with_max_memory = kInvalidQueryID;
+  for (auto it = admitted_queries_.begin(); it != admitted_queries_.end(); ++it) {
+    const std::size_t curr_query_footprint = getMemoryForQueryInBytes(it->first);
+    if (curr_query_footprint > max_memory_footprint) {
+      max_memory_footprint = curr_query_footprint;
+      query_id_with_max_memory = static_cast<int>(it->first);
+    }
+  }
+  return std::make_pair(query_id_with_max_memory, max_memory_footprint);
+}
+
+bool PriorityPolicyEnforcer::hasQuerySuspended(const std::size_t query_id) const {
+  auto it = std::find_if(suspended_queries_.begin(), suspended_queries_.end(), [query_id] (const QueryHandle *handle) {return handle->query_id() == query_id; });
+  return it != suspended_queries_.end();
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/14622e25/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 33d0e1f..0659230 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -19,6 +19,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_
 
 #include <cstddef>
+#include <deque>
 #include <memory>
 #include <queue>
 #include <tuple>
@@ -195,6 +196,17 @@ class PriorityPolicyEnforcer {
 
   const std::size_t getMemoryForQueryInBytes(const std::size_t query_id);
 
+  // Return a pair:
+  // 1st element - Query ID (kInvalidQueryID if no such query)
+  // 2nd element - Memory foot print (0 for invalid query ID).
+  const std::pair<int, std::size_t> getQueryWithHighestMemoryFootprint();
+
+  void suspendQuery(const std::size_t query_id);
+
+  bool hasQuerySuspended(const std::size_t query_id) const;
+
+  bool admitSuspendedQuery(QueryHandle *query_handle);
+
   const tmb::client_id foreman_client_id_;
   const std::size_t num_numa_nodes_;
 
@@ -215,6 +227,11 @@ class PriorityPolicyEnforcer {
   // The queries which haven't been admitted yet.
   std::queue<QueryHandle*> waiting_queries_;
 
+  // The queries which have been suspended.
+  std::vector<QueryHandle*> suspended_queries_;
+
+  std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> suspended_query_managers_;
+
   // Key = query ID, value = a pointer to the QueryHandle.
   // Note - This map has entries for active and waiting queries only.
   std::unordered_map<std::size_t, QueryHandle*> query_id_to_handle_;