You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/14 04:20:58 UTC
incubator-quickstep git commit: Check memory availability before
admitting a query.
Repository: incubator-quickstep
Updated Branches:
refs/heads/memory-estimate 1018c56b4 -> 0b973b38e
Check memory availability before admitting a query.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0b973b38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0b973b38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0b973b38
Branch: refs/heads/memory-estimate
Commit: 0b973b38efdbdbe287755d1dde629d35e8268859
Parents: 1018c56
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jul 13 23:19:58 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jul 13 23:19:58 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 1 +
query_execution/PriorityPolicyEnforcer.cpp | 51 +++++++++++++++++++------
query_execution/PriorityPolicyEnforcer.hpp | 4 ++
3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0b973b38/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 9ab86b2..11e0e1d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -116,6 +116,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_WorkerMessage
quickstep_queryoptimizer_QueryHandle
quickstep_relationaloperators_WorkOrder
+ quickstep_storage_StorageManager
quickstep_utility_Macros
tmb)
target_link_libraries(quickstep_queryexecution_PriorityPolicyEnforcer
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0b973b38/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 3d151db..f426568 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -17,6 +17,7 @@
#include "query_execution/PriorityPolicyEnforcer.hpp"
+#include <algorithm>
#include <cstddef>
#include <memory>
#include <queue>
@@ -32,6 +33,7 @@
#include "query_execution/WorkerDirectory.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageManager.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -57,12 +59,13 @@ PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_clie
storage_manager_(storage_manager),
worker_directory_(worker_directory),
bus_(bus),
- profile_individual_workorders_(profile_individual_workorders) {
+ profile_individual_workorders_(profile_individual_workorders),
+ committed_memory_(0) {
learner_.reset(new Learner());
}
bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
- if (admitted_queries_.size() < kMaxConcurrentQueries) {
+ if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries) {
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
@@ -70,13 +73,14 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
admitted_queries_[query_id].reset(
new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
catalog_database_, storage_manager_, bus_));
- DLOG(INFO) << "Admitted query with ID: " << query_handle->query_id()
- << " priority: " << query_handle->query_priority();
+ std::cout << "Admitted query with ID: " << query_handle->query_id()
+ << " priority: " << query_handle->query_priority() << "\n";
priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
learner_->addQuery(*query_handle);
query_handle->setAdmissionTime();
query_id_to_handle_[query_handle->query_id()] = query_handle;
LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
+ LOG(INFO) << "Query " << query_handle->query_id() << " memory admissible? " << admissionMemoryCheck(query_handle);
return true;
} else {
LOG(ERROR) << "Query with the same ID " << query_id << " exists";
@@ -84,9 +88,12 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
}
} else {
// This query will have to wait.
- DLOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
- query_id_to_handle_[query_handle->query_id()] = query_handle;
- waiting_queries_.push(query_handle);
+ std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
+ if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
+ // This query was not waitlisted earlier.
+ query_id_to_handle_[query_handle->query_id()] = query_handle;
+ waiting_queries_.push(query_handle);
+ }
return false;
}
}
@@ -166,8 +173,11 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
if (!waiting_queries_.empty()) {
// Admit the earliest waiting query.
QueryHandle *new_query = waiting_queries_.front();
- waiting_queries_.pop();
- admitQuery(new_query);
+ // waiting_queries_.pop();
+ if (admitQuery(new_query)) {
+ std::cout << "Removing Q " << new_query->query_id() << " from waitlist\n";
+ waiting_queries_.pop();
+ }
}
} else if (return_code == QueryManager::QueryStatusCode::kOperatorExecuted) {
learner_->removeOperator(query_id, operator_id);
@@ -237,6 +247,8 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
priority_query_ids_.erase(query_priority_unsigned);
}
query_id_to_handle_[query_id]->setCompletionTime();
+ const std::size_t estimated_memory_bytes = query_id_to_handle_[query_id]->getEstimatedMaxMemoryInBytes();
+ committed_memory_ -= estimated_memory_bytes;
// Remove the query from the learner.
learner_->removeQuery(query_id);
// TODO(harshad) - Admit waiting queries, if any.
@@ -245,12 +257,13 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
bool PriorityPolicyEnforcer::admitQueries(
const std::vector<QueryHandle*> &query_handles) {
+ bool result = true;
for (QueryHandle *curr_query : query_handles) {
if (!admitQuery(curr_query)) {
- return false;
+ result = false;
}
}
- return true;
+ return result;
}
void PriorityPolicyEnforcer::recordTimeForWorkOrder(
@@ -324,4 +337,20 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
return nullptr;
}
+bool PriorityPolicyEnforcer::admissionMemoryCheck(const QueryHandle *query_handle) {
+ if (admitted_queries_.empty()) {
+ // No query running in the system, let the query in.
+ return true;
+ }
+ const std::size_t estimated_memory_requirement_bytes = query_handle->getEstimatedMaxMemoryInBytes();
+ const std::size_t estimated_slots = StorageManager::SlotsNeededForBytes(estimated_memory_requirement_bytes);
+ const std::size_t current_slots = StorageManager::SlotsNeededForBytes(storage_manager_->getMemorySize());
+ const std::size_t committed_slots = StorageManager::SlotsNeededForBytes(committed_memory_);
+ if (std::max(committed_slots, current_slots) + estimated_slots < storage_manager_->getMaxBufferPoolSlots()) {
+ committed_memory_ += estimated_memory_requirement_bytes;
+ return true;
+ }
+ return false;
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0b973b38/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index b20d831..4d5eb6a 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -189,6 +189,8 @@ class PriorityPolicyEnforcer {
const std::size_t priority_level,
std::unordered_map<std::size_t, bool> *finished_queries_ids);
+ bool admissionMemoryCheck(const QueryHandle *query_handle);
+
const tmb::client_id foreman_client_id_;
const std::size_t num_numa_nodes_;
@@ -226,6 +228,8 @@ class PriorityPolicyEnforcer {
std::unique_ptr<Learner> learner_;
+ std::size_t committed_memory_;
+
DISALLOW_COPY_AND_ASSIGN(PriorityPolicyEnforcer);
};