You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/01 18:02:31 UTC
incubator-quickstep git commit: Function to log work order execution
statistics. - Change log severity levels. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/scheduler++ f9a0696f0 -> 4c3169e45 (forced update)
Function to log work order execution statistics.
- Change log severity levels.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4c3169e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4c3169e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4c3169e4
Branch: refs/heads/scheduler++
Commit: 4c3169e45cbcd4b1206274315e8b0dbfbb46cd10
Parents: efd88a7
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jul 1 11:38:18 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Fri Jul 1 13:01:44 2016 -0500
----------------------------------------------------------------------
query_execution/Learner.cpp | 26 +++++++++++++++++++----
query_execution/Learner.hpp | 3 +++
query_execution/PriorityPolicyEnforcer.cpp | 10 ++++-----
query_execution/QueryExecutionMessages.proto | 6 ++++++
query_execution/Worker.cpp | 2 ++
5 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 3c6f42b..e8dc4b0 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -61,16 +61,17 @@ void Learner::addCompletionFeedback(
}
updateProbabilitiesForQueriesInPriorityLevel(priority_level, query_id);
updateProbabilitiesOfAllPriorityLevels();
- printProbabilitiesForPriorityLevel(priority_level);
+ // printProbabilitiesForPriorityLevel(priority_level);
+ printWorkOrderDetails(workorder_completion_proto);
}
void Learner::updateProbabilitiesForQueriesInPriorityLevel(
const std::size_t priority_level, const std::size_t query_id) {
DCHECK(isPriorityLevelPresent(priority_level));
if (execution_stats_[priority_level].empty()) {
- LOG(INFO) << "Updating probabilities for query ID: " << query_id
- << " and priority level: " << priority_level
- << " that has no queries";
+ LOG(WARNING) << "Updating probabilities for query ID: " << query_id
+ << " and priority level: " << priority_level
+ << " that has no queries";
return;
} else if (execution_stats_[priority_level].size() == 1u) {
DCHECK(current_probabilities_[priority_level] != nullptr);
@@ -286,4 +287,21 @@ void Learner::printProbabilitiesForPriorityLevel(const std::size_t priority_leve
}
}
+void Learner::printWorkOrderDetails(
+ const serialization::NormalWorkOrderCompletionMessage &proto) const {
+ // Format: Query ID, Operator ID, Worker ID, Time in micros, WO execution end timestamp.
+ std::string result = "";
+ result.reserve(30);
+ result += std::to_string(proto.query_id()); // 2 chars
+ result += "|";
+ result += std::to_string(proto.operator_index()); // 2 chars
+ result += "|";
+ result += std::to_string(proto.worker_thread_index()); // 2 chars
+ result += "|";
+ result += std::to_string(proto.execution_time_in_microseconds()); // 5 chars
+ result += "|";
+ result += std::to_string(proto.execution_end_timestamp()); // 12 chars
+ LOG(INFO) << result;
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 6cea325..8654544 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -472,6 +472,9 @@ class Learner {
void printProbabilitiesForPriorityLevel(const std::size_t priority_level);
+ void printWorkOrderDetails(
+ const serialization::NormalWorkOrderCompletionMessage &proto) const;
+
// Key = Priority level, value = A vector of pairs.
// Each pair:
// 1st element: Query ID.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 93908a9..63f0383 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -51,7 +51,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
admitted_queries_[query_id].reset(
new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
catalog_database_, storage_manager_, bus_));
- LOG(INFO) << "Admitted query with ID: " << query_handle->query_id() << " priority: " << query_handle->query_priority();
+ DLOG(INFO) << "Admitted query with ID: " << query_handle->query_id() << " priority: " << query_handle->query_priority();
priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
learner_->addQuery(*query_handle);
query_handle->setAdmissionTime();
@@ -63,7 +63,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
}
} else {
// This query will have to wait.
- LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
+ DLOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
query_id_to_handle_[query_handle->query_id()] = query_handle;
waiting_queries_.push(query_handle);
return false;
@@ -178,11 +178,10 @@ void PriorityPolicyEnforcer::getWorkerMessages(
while (checked_priority_levels.size() < priority_query_ids_.size()) {
const int chosen_priority_level = learner_->pickRandomPriorityLevel();
if (chosen_priority_level == kInvalidPriorityLevel) {
- LOG(INFO) << "No valid priority level chosen";
+ DLOG(INFO) << "No valid priority level chosen";
break;
} else if (checked_priority_levels.find(static_cast<std::size_t>(
chosen_priority_level)) != checked_priority_levels.end()) {
- DLOG(INFO) << "The chosen priority level " << chosen_priority_level << " was checked already";
continue;
} else {
WorkerMessage *next_worker_message =
@@ -230,7 +229,7 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
// Remove the query from the learner.
learner_->removeQuery(query_id);
// TODO(harshad) - Admit waiting queries, if any.
- LOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
+ DLOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
}
bool PriorityPolicyEnforcer::admitQueries(
@@ -279,7 +278,6 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
std::unique_ptr<WorkerMessage> next_worker_message(
chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
if (next_worker_message != nullptr) {
- // LOG(INFO) << "Selecting a work order from query " << qid << " instead";
return next_worker_message.release();
} else {
// This query doesn't have any WorkerMessage right now. Mark as checked.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 65a8946..db35ecc 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -39,6 +39,8 @@ message NormalWorkOrderCompletionMessage {
required uint64 worker_thread_index = 2;
required uint64 query_id = 3;
optional uint64 execution_time_in_microseconds = 4;
+ // Indicates time elapsed in microseconds since epoch.
+ optional uint64 execution_end_timestamp = 5;
}
// A message sent upon completion of a rebuild WorkOrder execution.
@@ -47,6 +49,10 @@ message RebuildWorkOrderCompletionMessage {
required uint64 worker_thread_index = 2;
required uint64 query_id = 3;
optional uint64 execution_time_in_microseconds = 4;
+ optional uint64 execution_end_timestamp = 5; // Indicates time elapsed in
+ // microseconds since epoch,
+ // when the work order execution
+ // got over.
}
message CatalogRelationNewBlockMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index ae889c7..5ad80ed 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -123,11 +123,13 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
const uint64_t execution_time_microseconds =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count();
+ const uint64_t execution_end_timestamp = std::chrono::duration_cast<std::chrono::microseconds>(end.time_since_epoch()).count();
// Construct the proto message.
proto->set_operator_index(worker_message.getRelationalOpIndex());
proto->set_query_id(query_id_for_workorder);
proto->set_worker_thread_index(worker_thread_index_);
proto->set_execution_time_in_microseconds(execution_time_microseconds);
+ proto->set_execution_end_timestamp(execution_end_timestamp);
}
} // namespace quickstep