You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/01 04:41:44 UTC
[14/18] incubator-quickstep git commit: Select query from learner in
PolicyEnforcer.
Select query from learner in PolicyEnforcer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f379b468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f379b468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f379b468
Branch: refs/heads/scheduler++
Commit: f379b46871d3855cc1c59fe659e9050e4ae8f69e
Parents: 864e5f6
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 12:25:02 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 23:41:05 2016 -0500
----------------------------------------------------------------------
query_execution/Learner.cpp | 5 +-
query_execution/Learner.hpp | 31 +++++----
query_execution/PriorityPolicyEnforcer.cpp | 86 +++++++++++++++++++++----
query_execution/PriorityPolicyEnforcer.hpp | 19 ++++++
4 files changed, 112 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f379b468/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 11c3735..bb24baa 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -49,7 +49,7 @@ void Learner::addCompletionFeedback(
&workorder_completion_proto) {
const std::size_t query_id = workorder_completion_proto.query_id();
DCHECK(isQueryPresent(query_id));
- const std::size_t priority_level = getQueryPriority(query_id);
+ const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
ExecutionStats *execution_stats = getExecutionStats(query_id);
DCHECK(execution_stats != nullptr);
execution_stats->addEntry(
@@ -106,12 +106,10 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
query_id,
1 / static_cast<float>(mean_workorders_per_query[query_id]),
denominator);
- // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
} else {
// At least one of the queries has predicted time for next work order as 0.
// In such a case, we don't update the probabilities and continue to use
// the older probabilities.
- // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
return;
}
}
@@ -248,7 +246,6 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
probabilities_of_priority_levels_->removeObject(priority_level);
// NOTE(harshad) : Not using this cache as it gets confusing.
// has_feedback_from_all_queries_.erase(priority_level);
- // LOG(INFO) << "Removed priority level: " << priority_level;
if (hasActiveQueries()) {
if (static_cast<int>(priority_level) == highest_priority_level_) {
// The priority level to be removed is the highest priority level.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f379b468/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 2c6fdef..8634842 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -70,7 +70,7 @@ class Learner {
**/
void removeQuery(const std::size_t query_id) {
DCHECK(isQueryPresent(query_id));
- const std::size_t priority_level = getQueryPriority(query_id);
+ const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
// Find the iterator to the query in execution_stats_.
auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
execution_stats_[priority_level].erase(stats_iter_mutable);
@@ -158,9 +158,6 @@ class Learner {
if (hasActiveQueries()) {
const int result = static_cast<int>(
probabilities_of_priority_levels_->pickRandomProperty());
- /*LOG(INFO) << "Random priority level: " << result << " has "
- << current_probabilities_.find(result)->second->getNumObjects()
- << " queries";*/
return result;
} else {
return kInvalidPriorityLevel;
@@ -182,10 +179,8 @@ class Learner {
DCHECK_GT(random_priority_level, 0);
const int result = pickRandomQueryFromPriorityLevel(
static_cast<std::size_t>(random_priority_level));
- // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
return result;
} else {
- // LOG(INFO) << "No active query right now";
return kInvalidQueryID;
}
}
@@ -209,7 +204,6 @@ class Learner {
return static_cast<int>(
current_probabilities_.at(priority_level)->pickRandomProperty());
}
- // LOG(INFO) << "No queries in priority level: " << priority_level;
} else {
DCHECK(default_probabilities_.at(priority_level) != nullptr);
const auto it = default_probabilities_.find(priority_level);
@@ -217,12 +211,24 @@ class Learner {
return static_cast<int>(
default_probabilities_.at(priority_level)->pickRandomProperty());
}
- // LOG(INFO) << "No queries in priority level: " << priority_level;
}
}
return kInvalidQueryID;
}
+ /**
+ * @brief Given a query ID, if the query exists in the learner, return its
+ * priority, otherwise return kInvalidPriorityLevel.
+ **/
+ inline int getQueryPriority(const std::size_t query_id) const {
+ const auto it = query_id_to_priority_lookup_.find(query_id);
+ if (it != query_id_to_priority_lookup_.end()) {
+ return it->second;
+ } else {
+ return kInvalidPriorityLevel;
+ }
+ }
+
private:
/**
* @brief Update the probabilities for queries in the given priority level.
@@ -324,7 +330,7 @@ class Learner {
if (isQueryPresent(query_id)) {
const auto stats_iter = getExecutionStatsIterMutable(query_id);
DCHECK(stats_iter !=
- std::end(execution_stats_[getQueryPriority(query_id)]));
+ std::end(execution_stats_[getQueryPriorityUnsafe(query_id)]));
return stats_iter->second.get();
}
return nullptr;
@@ -339,7 +345,7 @@ class Learner {
inline std::vector<
std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
getExecutionStatsIterMutable(const std::size_t query_id) {
- const std::size_t priority_level = getQueryPriority(query_id);
+ const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
&stats_vector = execution_stats_[priority_level];
// The following line uses std::find_if to reach to the desired element
@@ -356,8 +362,11 @@ class Learner {
/**
* @brief Get a query's priority level given its ID.
+ *
+ * @note This version assumes that the given query ID exists in data
+ * structures.
**/
- inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+ inline const std::size_t getQueryPriorityUnsafe(const std::size_t query_id) const {
const auto it = query_id_to_priority_lookup_.find(query_id);
DCHECK(it != query_id_to_priority_lookup_.end());
return it->second;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f379b468/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 44ccb0a..6467367 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -53,6 +53,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
catalog_database_, storage_manager_, bus_));
LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+ priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
learner_->addQuery(*query_handle);
return true;
} else {
@@ -71,6 +72,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
// QueryManager, so that we need to extract message from the
// TaggedMessage only once.
std::size_t query_id;
+ std::size_t operator_id;
switch (tagged_message.message_type()) {
case kWorkOrderCompleteMessage: {
serialization::NormalWorkOrderCompletionMessage proto;
@@ -79,6 +81,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
+ operator_id = proto.operator_index();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
learner_->addCompletionFeedback(proto);
@@ -94,6 +97,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
+ operator_id = proto.operator_index();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
break;
@@ -132,6 +136,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
const QueryManager::QueryStatusCode return_code =
admitted_queries_[query_id]->processMessage(tagged_message);
+ //NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
removeQuery(query_id);
if (!waiting_queries_.empty()) {
@@ -140,6 +145,8 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
waiting_queries_.pop();
admitQuery(new_query);
}
+ } else if (return_code == QueryManager::QueryStatusCode::kOperatorExecuted) {
+ learner_->removeOperator(query_id, operator_id);
}
}
@@ -161,26 +168,25 @@ void PriorityPolicyEnforcer::getWorkerMessages(
DCHECK_GT(per_query_share, 0u);
std::vector<std::size_t> finished_queries_ids;
- for (const auto &admitted_query_info : admitted_queries_) {
- QueryManager *curr_query_manager = admitted_query_info.second.get();
- DCHECK(curr_query_manager != nullptr);
- std::size_t messages_collected_curr_query = 0;
- while (messages_collected_curr_query < per_query_share) {
+ if (learner_->hasActiveQueries()) {
+ // Key = priority level. Value = Whether we have already checked the
+ std::unordered_map<std::size_t, bool> checked_priority_levels;
+ // While there are more priority levels to be checked ..
+ while (checked_priority_levels.size() < priority_query_ids_.size()) {
+ const int chosen_priority_level = learner_->pickRandomPriorityLevel();
+ DCHECK(chosen_priority_level != kInvalidPriorityLevel);
WorkerMessage *next_worker_message =
- curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+ getNextWorkerMessageFromPriorityLevel(chosen_priority_level,
+ &finished_queries_ids);
if (next_worker_message != nullptr) {
- ++messages_collected_curr_query;
worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
} else {
- // No more work ordes from the current query at this time.
- // Check if the query's execution is over.
- if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
- // If the query has been executed, remove it.
- finished_queries_ids.push_back(admitted_query_info.first);
- }
- break;
+ checked_priority_levels[static_cast<std::size_t>(chosen_priority_level)] = true;
}
}
+ } else {
+ LOG(INFO) << "No active queries in the learner at this point.";
+ return;
}
for (const std::size_t finished_qid : finished_queries_ids) {
removeQuery(finished_qid);
@@ -194,6 +200,22 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
<< " that hasn't finished its execution";
}
admitted_queries_.erase(query_id);
+ // Remove the query from priority_query_ids_ structure.
+ const int query_priority = learner_->getQueryPriority(query_id);
+ DCHECK(query_priority != kInvalidPriorityLevel);
+ const std::size_t query_priority_unsigned =
+ static_cast<std::size_t>(query_priority);
+ std::vector<std::size_t> *query_ids_for_priority_level =
+ &priority_query_ids_[query_priority_unsigned];
+ query_ids_for_priority_level->erase(
+ std::remove(query_ids_for_priority_level->begin(),
+ query_ids_for_priority_level->end(),
+ query_id));
+ if (query_ids_for_priority_level->empty()) {
+ // No more queries for the given priority level. Remove the entry.
+ priority_query_ids_.erase(query_priority_unsigned);
+ }
+ // Remove the query from the learner.
learner_->removeQuery(query_id);
}
@@ -219,4 +241,40 @@ void PriorityPolicyEnforcer::recordTimeForWorkOrder(
proto.execution_time_in_microseconds());
}
+WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
+ const std::size_t priority_level,
+ std::vector<std::size_t> *finished_queries_ids) {
+ // Key = query ID from the given priority level, value = whether we have
+ // checked this query earlier.
+ std::unordered_map<std::size_t, bool> checked_query_ids;
+ // While there are more queries to be checked ..
+ while (checked_query_ids.size() < priority_query_ids_[priority_level].size()) {
+ const int chosen_query_id = learner_->pickRandomQueryFromPriorityLevel(priority_level);
+ if (chosen_query_id == kInvalidQueryID) {
+ // No query available at this time in this priority level.
+ return nullptr;
+ } else if (checked_query_ids.find(static_cast<std::size_t>(chosen_query_id)) != checked_query_ids.end()) {
+ // We have already seen this query ID, try one more time.
+ LOG(INFO) << "We have already seen this query, continue";
+ continue;
+ } else {
+ // We haven't seen this query earlier. Check if it has any schedulable
+ // WorkOrder.
+ QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(chosen_query_id)].get();
+ DCHECK(chosen_query_manager != nullptr);
+ std::unique_ptr<WorkerMessage> next_worker_message(chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
+ if (next_worker_message != nullptr) {
+ return next_worker_message.release();
+ } else {
+ // This query doesn't have any WorkerMessage right now. Mark as checked.
+ checked_query_ids[chosen_query_id] = true;
+ if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ finished_queries_ids->emplace_back(static_cast<std::size_t>(chosen_query_id));
+ }
+ }
+ }
+ }
+ return nullptr;
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f379b468/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 94cbe38..281c066 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -183,6 +183,21 @@ class PriorityPolicyEnforcer {
void recordTimeForWorkOrder(
const serialization::NormalWorkOrderCompletionMessage &proto);
+ /**
+ * @brief get a WorkerMessage from the chosen priority level.
+ *
+ * @param priority_level The priority level from which the query will be
+ * chosen.
+ * @param finished_query_ids A vector of query IDs that have finished their
+ * execution.
+ *
+ * @return A WorkerMessage. If no query can be chosen from this priority level,
+ * return NULL.
+ **/
+ WorkerMessage *getNextWorkerMessageFromPriorityLevel(
+ const std::size_t priority_level,
+ std::vector<std::size_t> *finished_queries_ids);
+
const tmb::client_id foreman_client_id_;
const std::size_t num_numa_nodes_;
@@ -193,6 +208,10 @@ class PriorityPolicyEnforcer {
tmb::MessageBus *bus_;
const bool profile_individual_workorders_;
+ // Key = priority level, value = a vector of IDs of the queries belonging to
+ // the key priority level.
+ std::unordered_map<std::size_t, std::vector<std::size_t>> priority_query_ids_;
+
// Key = query ID, value = QueryManager* for the key query.
std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;