You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/18 04:41:30 UTC
[2/2] incubator-quickstep git commit: Suspend memory intensive
queries when memory pressure is high.
Suspend memory intensive queries when memory pressure is high.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/14622e25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/14622e25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/14622e25
Branch: refs/heads/memory-estimate
Commit: 14622e255d8cb8f557b464d6927493ace773e910
Parents: f1e5e85
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Jul 17 23:40:20 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sun Jul 17 23:40:20 2016 -0500
----------------------------------------------------------------------
query_execution/PriorityPolicyEnforcer.cpp | 114 ++++++++++++++++++++++--
query_execution/PriorityPolicyEnforcer.hpp | 17 ++++
2 files changed, 124 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/14622e25/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 37db4d0..11b9d70 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -67,7 +67,15 @@ PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_clie
}
bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
- if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries) {
+ // Find a victim query to be suspended.
+ while (!admissionMemoryCheck(query_handle)) {
+ std::pair<int, std::size_t> victim_query = getQueryWithHighestMemoryFootprint();
+ if (victim_query.first != kInvalidQueryID) {
+ // We need to suspend this query - move it from admitted to suspended.
+ suspendQuery(victim_query.first);
+ }
+ }
+ // if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
@@ -82,13 +90,12 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
query_handle->setAdmissionTime();
query_id_to_handle_[query_handle->query_id()] = query_handle;
LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
- LOG(INFO) << "Query " << query_handle->query_id() << " memory admissible? " << admissionMemoryCheck(query_handle);
return true;
} else {
LOG(ERROR) << "Query with the same ID " << query_id << " exists";
return false;
}
- } else {
+ /*} else {
// This query will have to wait.
std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
@@ -97,6 +104,40 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
waiting_queries_.push(query_handle);
}
return false;
+ }*/
+}
+
+bool PriorityPolicyEnforcer::admitSuspendedQuery(QueryHandle *query_handle) {
+ if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+ // Query with the same ID not present, ok to admit.
+ admitted_queries_[query_id].reset(
+ new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+ catalog_database_, storage_manager_, bus_));
+ std::cout << "Admitted query with ID: " << query_handle->query_id()
+ << " priority: " << query_handle->query_priority() << "\n";
+ priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
+ learner_->addQuery(*query_handle);
+ query_handle->setAdmissionTime();
+ query_id_to_handle_[query_handle->query_id()] = query_handle;
+ LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+ } else {
+ // Let the query be in the suspended mode.
+ // This query will have to wait.
+ /*std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
+ if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
+ // This query was not waitlisted earlier.
+ query_id_to_handle_[query_handle->query_id()] = query_handle;
+ waiting_queries_.push(query_handle);
+ }*/
+ return false;
}
}
@@ -117,10 +158,13 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
operator_id = proto.operator_index();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
- if (FLAGS_dynamic_probabilities_in_learner) {
- learner_->addCompletionFeedback<true>(proto);
- } else {
- learner_->addCompletionFeedback<false>(proto);
+ if (!hasQuerySuspended(query_id)) {
+ // Add completion feedback for non-suspended queries.
+ if (FLAGS_dynamic_probabilities_in_learner) {
+ learner_->addCompletionFeedback<true>(proto);
+ } else {
+ learner_->addCompletionFeedback<false>(proto);
+ }
}
if (profile_individual_workorders_) {
recordTimeForWorkOrder(proto);
@@ -176,6 +220,16 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
// NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
removeQuery(query_id);
+ if (!suspended_queries_.empty()) {
+ // Admit a suspended query.
+ QueryHandle *suspended_query = suspended_queries_.back();
+ if (admitSuspendedQuery(suspended_query)) {
+ std::cout << "Admitting suspended query " << suspended_query->query_id() << " back\n";
+ suspended_queries_.pop_back();
+ suspended_query_managers_.erase(suspended_query->query_id());
+ return;
+ }
+ }
if (!waiting_queries_.empty()) {
// Admit the earliest waiting query.
QueryHandle *new_query = waiting_queries_.front();
@@ -294,6 +348,34 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
DLOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
}
+void PriorityPolicyEnforcer::suspendQuery(const std::size_t query_id) {
+ suspended_query_managers_[query_id].reset(admitted_queries_[query_id].release());
+ suspended_queries_.push_back(query_id_to_handle_[query_id]);
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ admitted_queries_.erase(query_id);
+ // Remove the query from priority_query_ids_ structure.
+ const int query_priority = learner_->getQueryPriority(query_id);
+ DCHECK(query_priority != kInvalidPriorityLevel);
+ const std::size_t query_priority_unsigned =
+ static_cast<std::size_t>(query_priority);
+ std::vector<std::size_t> *query_ids_for_priority_level =
+ &priority_query_ids_[query_priority_unsigned];
+ query_ids_for_priority_level->erase(
+ std::remove(query_ids_for_priority_level->begin(),
+ query_ids_for_priority_level->end(),
+ query_id));
+ if (query_ids_for_priority_level->empty()) {
+ // No more queries for the given priority level. Remove the entry.
+ priority_query_ids_.erase(query_priority_unsigned);
+ }
+ // TODO(harshad) - Support actually evicting the memory used up by the suspended query.
+ const std::size_t estimated_memory_bytes = query_id_to_handle_[query_id]->getEstimatedMaxMemoryInBytes();
+ committed_memory_ -= estimated_memory_bytes;
+ // Remove the query from the learner.
+ learner_->removeQuery(query_id);
+ std::cout << "Suspended query: " << query_id << " with priority: " << query_priority;
+}
+
bool PriorityPolicyEnforcer::admitQueries(
const std::vector<QueryHandle*> &query_handles) {
bool result = true;
@@ -401,4 +483,22 @@ const std::size_t PriorityPolicyEnforcer::getMemoryForQueryInBytes(const std::si
return memory;
}
+const std::pair<int, std::size_t> PriorityPolicyEnforcer::getQueryWithHighestMemoryFootprint() {
+ std::size_t max_memory_footprint = 0;
+ int query_id_with_max_memory = kInvalidQueryID;
+ for (auto it = admitted_queries_.begin(); it != admitted_queries_.end(); ++it) {
+ const std::size_t curr_query_footprint = getMemoryForQueryInBytes(it->first);
+ if (curr_query_footprint > max_memory_footprint) {
+ max_memory_footprint = curr_query_footprint;
+ query_id_with_max_memory = static_cast<int>(it->first);
+ }
+ }
+ return std::make_pair(query_id_with_max_memory, max_memory_footprint);
+}
+
+bool PriorityPolicyEnforcer::hasQuerySuspended(const std::size_t query_id) const {
+ auto it = std::find_if(suspended_queries_.begin(), suspended_queries_.end(), [query_id] (const QueryHandle *handle) {return handle->query_id() == query_id; });
+ return it != suspended_queries_.end();
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/14622e25/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 33d0e1f..0659230 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -19,6 +19,7 @@
#define QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_
#include <cstddef>
+#include <deque>
#include <memory>
#include <queue>
#include <tuple>
@@ -195,6 +196,17 @@ class PriorityPolicyEnforcer {
const std::size_t getMemoryForQueryInBytes(const std::size_t query_id);
+ // Return a pair:
+ // 1st element - Query ID (kInvalidQueryID if no such query)
+ // 2nd element - Memory foot print (0 for invalid query ID).
+ const std::pair<int, std::size_t> getQueryWithHighestMemoryFootprint();
+
+ void suspendQuery(const std::size_t query_id);
+
+ bool hasQuerySuspended(const std::size_t query_id) const;
+
+ bool admitSuspendedQuery(QueryHandle *query_handle);
+
const tmb::client_id foreman_client_id_;
const std::size_t num_numa_nodes_;
@@ -215,6 +227,11 @@ class PriorityPolicyEnforcer {
// The queries which haven't been admitted yet.
std::queue<QueryHandle*> waiting_queries_;
+ // The queries which have been suspended.
+ std::vector<QueryHandle*> suspended_queries_;
+
+ std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> suspended_query_managers_;
+
// Key = query ID, value = a pointer to the QueryHandle.
// Note - This map has entries for active and waiting queries only.
std::unordered_map<std::size_t, QueryHandle*> query_id_to_handle_;