You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/20 04:31:19 UTC
incubator-quickstep git commit: Changes for load controller
experiment.
Repository: incubator-quickstep
Updated Branches:
refs/heads/memory-estimate 14622e255 -> 21a367f00
Changes for load controller experiment.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/21a367f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/21a367f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/21a367f0
Branch: refs/heads/memory-estimate
Commit: 21a367f00debb314c9144dc6d5e00fdf3463c625
Parents: 14622e2
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jul 19 23:30:51 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 19 23:30:51 2016 -0500
----------------------------------------------------------------------
query_execution/Foreman.cpp | 48 ++++++++-
query_execution/Foreman.hpp | 12 +++
query_execution/PriorityPolicyEnforcer.cpp | 136 +++++++++++++-----------
query_execution/PriorityPolicyEnforcer.hpp | 5 +-
4 files changed, 136 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21a367f0/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 40d6e5c..a7027d9 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -24,6 +24,7 @@
#include <utility>
#include <vector>
+#include "cli/InputParserUtil.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
@@ -49,6 +50,8 @@ namespace quickstep {
DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
"of pending work orders for the worker. This information is used "
"by the Foreman to assign work orders to worker threads");
+DEFINE_string(high_priority_queries_entry_points, "", "A comma separated list of entry points for high priority queries, each of which is defined in terms of milliseconds since the beginning of workload execution");
+DEFINE_uint64(num_high_priority_queries, 1, "Number of high priority queries to be admitted to the system");
Foreman::Foreman(const tmb::client_id main_thread_client_id,
WorkerDirectory *worker_directory,
@@ -62,7 +65,10 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
main_thread_client_id_(main_thread_client_id),
worker_directory_(DCHECK_NOTNULL(worker_directory)),
catalog_database_(DCHECK_NOTNULL(catalog_database)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ start_time_(std::chrono::steady_clock::now()),
+ high_priority_queries_injection_points_(InputParserUtil::ParseWorkerAffinities(FLAGS_num_high_priority_queries, FLAGS_high_priority_queries_entry_points))
+ {
const std::vector<QueryExecutionMessageType> sender_message_types{
kPoisonMessage,
kRebuildWorkOrderMessage,
@@ -95,6 +101,10 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
worker_directory_,
bus_,
profile_individual_workorders));
+
+ CHECK(FLAGS_num_high_priority_queries == high_priority_queries_injection_points_.size()) << "Number of high priority queries should be same as number of entry points";
+
+ high_priority_queries_admitted_.resize(FLAGS_num_high_priority_queries, false);
}
void Foreman::run() {
@@ -125,6 +135,15 @@ void Foreman::run() {
const AdmitRequestMessage *msg =
static_cast<const AdmitRequestMessage *>(tagged_message.message());
const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
+ vector<QueryHandle*> reduced_query_handles_list;
+ CHECK(query_handles.size() > FLAGS_num_high_priority_queries) << "Number of high priority queries should be less than total number of queries";
+ for (std::size_t i = 0; i < query_handles.size(); ++i) {
+ if (i < query_handles.size() - FLAGS_num_high_priority_queries) {
+ reduced_query_handles_list.push_back(query_handles[i]);
+ } else {
+ high_priority_query_handles_.push(query_handles[i]);
+ }
+ }
DCHECK(!query_handles.empty());
bool all_queries_admitted = true;
@@ -132,7 +151,7 @@ void Foreman::run() {
all_queries_admitted =
policy_enforcer_->admitQuery(query_handles.front());
} else {
- all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+ all_queries_admitted = policy_enforcer_->admitQueries(reduced_query_handles_list);
}
if (!all_queries_admitted) {
LOG(WARNING) << "The scheduler could not admit all the queries";
@@ -157,6 +176,8 @@ void Foreman::run() {
dispatchWorkerMessages(new_messages);
}
+ checkAndAdmitHighPriorityQueries();
+
// We check again, as some queries may produce zero work orders and finish
// their execution.
if (!policy_enforcer_->hasQueries()) {
@@ -252,4 +273,27 @@ void Foreman::printWorkOrderProfilingResults(const std::size_t query_id,
}
}
+bool Foreman::checkAndAdmitHighPriorityQueries() {
+ for (std::size_t i = 0; i < high_priority_queries_admitted_.size(); ++i) {
+ if (!high_priority_queries_admitted_[i]) {
+ // Check the timestamp.
+ auto time_in_millis = std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - start_time_).count();
+ if (time_in_millis > high_priority_queries_injection_points_[i]) {
+ // Admit the query.
+ QueryHandle *next_query_handle = high_priority_query_handles_.front();
+ high_priority_query_handles_.pop();
+ if (!policy_enforcer_->admitQuery(next_query_handle)) {
+ LOG(INFO) << "Could not admit query with ID: " << next_query_handle->query_id();
+ }
+ high_priority_queries_admitted_[i] = true;
+ return true;
+ } else {
+ // Wait for some more time to admit this query.
+ return false;
+ }
+ }
+ }
+ return false;
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21a367f0/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index c38a3e6..d2db48a 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
+#include <chrono>
#include <cstddef>
#include <cstdio>
#include <memory>
@@ -121,6 +122,8 @@ class Foreman final : public ForemanLite {
**/
bool canCollectNewMessages(const tmb::message_type_id message_type);
+ bool checkAndAdmitHighPriorityQueries();
+
const tmb::client_id main_thread_client_id_;
WorkerDirectory *worker_directory_;
@@ -130,6 +133,15 @@ class Foreman final : public ForemanLite {
std::unique_ptr<PriorityPolicyEnforcer> policy_enforcer_;
+ // Start time for Foreman.
+ const std::chrono::steady_clock::time_point start_time_;
+ // Whether high priority queries have been admitted to the system.
+ std::vector<bool> high_priority_queries_admitted_;
+ // Defined in terms of number of milliseconds.
+ const std::vector<int> high_priority_queries_injection_points_;
+ std::queue<QueryHandle*> high_priority_query_handles_;
+
+
DISALLOW_COPY_AND_ASSIGN(Foreman);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21a367f0/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 11b9d70..a238b8f 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -62,65 +62,68 @@ PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_clie
worker_directory_(worker_directory),
bus_(bus),
profile_individual_workorders_(profile_individual_workorders),
- committed_memory_(0) {
+ committed_memory_(0),
+ suspended_memory_(0) {
learner_.reset(new Learner());
}
bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
// Find a victim query to be suspended.
- while (!admissionMemoryCheck(query_handle)) {
- std::pair<int, std::size_t> victim_query = getQueryWithHighestMemoryFootprint();
- if (victim_query.first != kInvalidQueryID) {
- // We need to suspend this query - move it from admitted to suspended.
- suspendQuery(victim_query.first);
+ bool memory_available = admissionMemoryCheck(query_handle);
+ if (!memory_available) {
+ while (!memory_available) {
+ std::pair<int, std::size_t> victim_query = getQueryWithHighestMemoryFootprint();
+ if (victim_query.first != kInvalidQueryID) {
+ // We need to suspend this query - move it from admitted to suspended.
+ suspendQuery(victim_query.first);
+ memory_available = admissionMemoryCheck(query_handle);
+ } else {
+ std::cout << "No victim found, okay to admit query " << query_handle->query_id() << "\n";
+ break;
+ }
}
}
- // if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
- // Ok to admit the query.
- const std::size_t query_id = query_handle->query_id();
- if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
- // Query with the same ID not present, ok to admit.
- admitted_queries_[query_id].reset(
- new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
- catalog_database_, storage_manager_, bus_));
- std::cout << "Admitted query with ID: " << query_handle->query_id()
- << " priority: " << query_handle->query_priority() << "\n";
- priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
- learner_->addQuery(*query_handle);
- query_handle->setAdmissionTime();
- query_id_to_handle_[query_handle->query_id()] = query_handle;
- LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
- return true;
- } else {
- LOG(ERROR) << "Query with the same ID " << query_id << " exists";
- return false;
- }
- /*} else {
- // This query will have to wait.
- std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
- if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
- // This query was not waitlisted earlier.
- query_id_to_handle_[query_handle->query_id()] = query_handle;
- waiting_queries_.push(query_handle);
- }
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+ // Query with the same ID not present, ok to admit.
+ admitted_queries_[query_id].reset(
+ new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+ catalog_database_, storage_manager_, bus_));
+ std::cout << "Admitted query with ID: " << query_handle->query_id()
+ << " priority: " << query_handle->query_priority() << "\n";
+ priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
+ learner_->addQuery(*query_handle);
+ query_handle->setAdmissionTime();
+ query_id_to_handle_[query_handle->query_id()] = query_handle;
+ LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
return false;
- }*/
+ }
}
bool PriorityPolicyEnforcer::admitSuspendedQuery(QueryHandle *query_handle) {
- if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
+ if (admissionMemoryCheck(query_handle)) {
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
+ // As we deducted the current query footprint from the committed memory in
+ // suspendQuery() we need to add it back.
+ const std::size_t curr_query_footprint = getMemoryForQueryInBytes(query_id);
+ suspended_memory_ -= curr_query_footprint;
if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
// Query with the same ID not present, ok to admit.
- admitted_queries_[query_id].reset(
- new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
- catalog_database_, storage_manager_, bus_));
- std::cout << "Admitted query with ID: " << query_handle->query_id()
+ // Don't create a new QueryManager instance, it has already been created.
+ // Just move it from suspended_query_managers_ to admitted_queries_.
+ DCHECK(suspended_query_managers_.find(query_id) != suspended_query_managers_.end());
+ DCHECK(suspended_query_managers_.at(query_id) != nullptr);
+ admitted_queries_[query_id].reset(suspended_query_managers_[query_id].release());
+ std::cout << "Admitted suspended query with ID: " << query_handle->query_id()
<< " priority: " << query_handle->query_priority() << "\n";
priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
learner_->addQuery(*query_handle);
- query_handle->setAdmissionTime();
+ // query_handle->setAdmissionTime();
query_id_to_handle_[query_handle->query_id()] = query_handle;
LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
return true;
@@ -130,13 +133,6 @@ bool PriorityPolicyEnforcer::admitSuspendedQuery(QueryHandle *query_handle) {
}
} else {
// Let the query be in the suspended mode.
- // This query will have to wait.
- /*std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
- if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
- // This query was not waitlisted earlier.
- query_id_to_handle_[query_handle->query_id()] = query_handle;
- waiting_queries_.push(query_handle);
- }*/
return false;
}
}
@@ -214,23 +210,32 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
default:
LOG(FATAL) << "Unknown message type found in PriorityPolicyEnforcer";
}
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
- const QueryManager::QueryStatusCode return_code =
- admitted_queries_[query_id]->processMessage(tagged_message);
+ QueryManager::QueryStatusCode return_code = QueryManager::QueryStatusCode::kNone;
+ if (!hasQuerySuspended(query_id)) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ return_code = admitted_queries_[query_id]->processMessage(tagged_message);
+ } else {
+ DCHECK(suspended_query_managers_.find(query_id) != suspended_query_managers_.end());
+ return_code = suspended_query_managers_[query_id]->processMessage(tagged_message);
+ }
// NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
removeQuery(query_id);
if (!suspended_queries_.empty()) {
// Admit a suspended query.
QueryHandle *suspended_query = suspended_queries_.back();
- if (admitSuspendedQuery(suspended_query)) {
+ while (admitSuspendedQuery(suspended_query)) {
+ // Until we can admit more suspended queries ...
std::cout << "Admitting suspended query " << suspended_query->query_id() << " back\n";
suspended_queries_.pop_back();
suspended_query_managers_.erase(suspended_query->query_id());
- return;
+ if (!suspended_queries_.empty()) {
+ suspended_query = suspended_queries_.back();
+ } else {
+ break;
+ }
}
- }
- if (!waiting_queries_.empty()) {
+ } else if (!waiting_queries_.empty()) {
// Admit the earliest waiting query.
QueryHandle *new_query = waiting_queries_.front();
// waiting_queries_.pop();
@@ -345,6 +350,7 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
// Remove the query from the learner.
learner_->removeQuery(query_id);
// TODO(harshad) - Admit waiting queries, if any.
+ query_id_to_handle_.erase(query_id);
DLOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
}
@@ -368,12 +374,14 @@ void PriorityPolicyEnforcer::suspendQuery(const std::size_t query_id) {
// No more queries for the given priority level. Remove the entry.
priority_query_ids_.erase(query_priority_unsigned);
}
- // TODO(harshad) - Support actually evicting the memory used up by the suspended query.
const std::size_t estimated_memory_bytes = query_id_to_handle_[query_id]->getEstimatedMaxMemoryInBytes();
+ // TODO(harshad) - Support actually evicting the memory used up by the suspended query.
committed_memory_ -= estimated_memory_bytes;
+ const std::size_t curr_query_footprint = getMemoryForQueryInBytes(query_id);
+ suspended_memory_ += curr_query_footprint;
// Remove the query from the learner.
learner_->removeQuery(query_id);
- std::cout << "Suspended query: " << query_id << " with priority: " << query_priority;
+ std::cout << "Suspended query: " << query_id << " with priority: " << query_priority << "\n";
}
bool PriorityPolicyEnforcer::admitQueries(
@@ -461,13 +469,16 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
bool PriorityPolicyEnforcer::admissionMemoryCheck(const QueryHandle *query_handle) {
if (admitted_queries_.empty()) {
// No query running in the system, let the query in.
+ const std::size_t estimated_memory_requirement_bytes = query_handle->getEstimatedMaxMemoryInBytes();
+ committed_memory_ += estimated_memory_requirement_bytes;
return true;
}
const std::size_t estimated_memory_requirement_bytes = query_handle->getEstimatedMaxMemoryInBytes();
const std::size_t estimated_slots = StorageManager::SlotsNeededForBytes(estimated_memory_requirement_bytes);
- const std::size_t current_slots = StorageManager::SlotsNeededForBytes(storage_manager_->getMemorySize());
+ const std::size_t current_slots = StorageManager::SlotsNeededForBytes(storage_manager_->getMemorySize() - ((suspended_memory_ > 0) ? suspended_memory_ : 0));
const std::size_t committed_slots = StorageManager::SlotsNeededForBytes(committed_memory_);
- if (std::max(committed_slots, current_slots) + estimated_slots < storage_manager_->getMaxBufferPoolSlots()) {
+ /*std::cout << "Requested: " << std::max(committed_slots, current_slots) + estimated_slots << " Current: " << current_slots << " Limit: " << 0.8 * float(storage_manager_->getMaxBufferPoolSlots()) << "\n";*/
+ if (std::max(committed_slots, current_slots) + estimated_slots < (0.8 * float(storage_manager_->getMaxBufferPoolSlots()))) {
committed_memory_ += estimated_memory_requirement_bytes;
return true;
}
@@ -478,8 +489,11 @@ const std::size_t PriorityPolicyEnforcer::getMemoryForQueryInBytes(const std::si
DCHECK(query_id_to_handle_.find(query_id) != query_id_to_handle_.end());
QueryHandle *query_handle = query_id_to_handle_[query_id];
std::size_t memory = query_handle->getMemoryTempRelationsBytes();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
- memory += admitted_queries_[query_id]->getMemoryBytes();
+ if (!hasQuerySuspended(query_id)) {
+ memory += admitted_queries_[query_id]->getMemoryBytes();
+ } else {
+ memory += suspended_query_managers_[query_id]->getMemoryBytes();
+ }
return memory;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21a367f0/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 0659230..08c3ada 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -138,7 +138,7 @@ class PriorityPolicyEnforcer {
* the policy enforcer doesn't have any query.
**/
inline bool hasQueries() const {
- return !(admitted_queries_.empty() && waiting_queries_.empty());
+ return !(admitted_queries_.empty() && waiting_queries_.empty() && suspended_query_managers_.empty());
}
/**
@@ -249,7 +249,8 @@ class PriorityPolicyEnforcer {
std::unique_ptr<Learner> learner_;
- std::size_t committed_memory_;
+ long committed_memory_;
+ long suspended_memory_;
DISALLOW_COPY_AND_ASSIGN(PriorityPolicyEnforcer);
};