You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/01 04:41:48 UTC
[18/18] incubator-quickstep git commit: Add individual query time
printing
Add individual query time printing
- Query entry/admission and completion times are made part of the query
handle class.
- Bug fix in PriorityPolicyEnforcer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/96354210
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/96354210
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/96354210
Branch: refs/heads/scheduler++
Commit: 96354210f40e73048d4b47836f9ecbf96b616a72
Parents: f26bc4e
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 30 15:00:44 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 23:41:05 2016 -0500
----------------------------------------------------------------------
cli/QuickstepCli.cpp | 5 +++
query_execution/Learner.cpp | 1 +
query_execution/Learner.hpp | 3 +-
query_execution/PriorityPolicyEnforcer.cpp | 11 ++++---
query_execution/PriorityPolicyEnforcer.hpp | 4 +++
query_optimizer/QueryHandle.hpp | 41 +++++++++++++++++++++++++
6 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96354210/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 3010ccc..4906f7a 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -19,6 +19,7 @@
/* A standalone command-line interface to QuickStep */
+#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdio>
@@ -530,6 +531,10 @@ int main(int argc, char* argv[]) {
for (std::size_t i = 0; i < query_handles.size(); ++i) {
InputParserUtil::PrintAndDropOutputRelation(
query_handles[i], query_processor.get());
+ printf("Time: %s ms\n",
+ quickstep::DoubleToStringWithSignificantDigits(
+ query_handles[i]->getExecutionTimeMillis(), 3)
+ .c_str());
}
query_processor->saveCatalog();
if (quickstep::FLAGS_profile_and_report_workorder_perf) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96354210/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 9801f60..183d688 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -245,6 +245,7 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
execution_stats_.erase(priority_level);
current_probabilities_.erase(priority_level);
probabilities_of_priority_levels_->removeObject(priority_level);
+ default_probabilities_.erase(priority_level);
// NOTE(harshad) : Not using this cache as it gets confusing.
// has_feedback_from_all_queries_.erase(priority_level);
if (hasActiveQueries()) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96354210/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index ef92db9..024dc9b 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -276,7 +276,7 @@ class Learner {
CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
if (!isPriorityLevelPresent(priority_level)) {
current_probabilities_[priority_level].reset(new ProbabilityStore());
- execution_stats_[priority_level];
+ execution_stats_.emplace(priority_level, std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>());
if (static_cast<int>(priority_level) > highest_priority_level_) {
highest_priority_level_ = priority_level;
}
@@ -380,6 +380,7 @@ class Learner {
const std::size_t priority_level) const {
// NOTE(harshad) : Not using this cache as it gets confusing.
// return has_feedback_from_all_queries_.at(priority_level);
+ DCHECK(isPriorityLevelPresent(priority_level));
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
&stats_vector = execution_stats_.at(priority_level);
for (std::size_t i = 0; i < stats_vector.size(); ++i) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96354210/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 0a15094..93908a9 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -43,7 +43,6 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" the workers.");
bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
- Learner learner;
if (admitted_queries_.size() < kMaxConcurrentQueries) {
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
@@ -52,9 +51,11 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
admitted_queries_[query_id].reset(
new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
catalog_database_, storage_manager_, bus_));
- LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+ LOG(INFO) << "Admitted query with ID: " << query_handle->query_id() << " priority: " << query_handle->query_priority();
priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
learner_->addQuery(*query_handle);
+ query_handle->setAdmissionTime();
+ query_id_to_handle_[query_handle->query_id()] = query_handle;
return true;
} else {
LOG(ERROR) << "Query with the same ID " << query_id << " exists";
@@ -63,6 +64,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
} else {
// This query will have to wait.
LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
+ query_id_to_handle_[query_handle->query_id()] = query_handle;
waiting_queries_.push(query_handle);
return false;
}
@@ -224,10 +226,11 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
// No more queries for the given priority level. Remove the entry.
priority_query_ids_.erase(query_priority_unsigned);
}
+ query_id_to_handle_[query_id]->setCompletionTime();
// Remove the query from the learner.
learner_->removeQuery(query_id);
- LOG(INFO) << "Query " << query_id << " removed. has queries? " << hasQueries();
- // Admit waiting queries, if any.
+ // TODO(harshad) - Admit waiting queries, if any.
+ LOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
}
bool PriorityPolicyEnforcer::admitQueries(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96354210/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index eafb099..1f13a10 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -218,6 +218,10 @@ class PriorityPolicyEnforcer {
// The queries which haven't been admitted yet.
std::queue<QueryHandle*> waiting_queries_;
+ // Key = query ID, value = a pointer to the QueryHandle.
+ // Note - This map has entries for active and waiting queries only.
+ std::unordered_map<std::size_t, QueryHandle*> query_id_to_handle_;
+
// Key = Query ID.
// Value = A tuple indicating a record of executing a work order.
// Within a tuple ...
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/96354210/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 5f3649a..04f672e 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -17,6 +17,7 @@
#ifndef QUICKSTEP_QUERY_OPTIMIZER_QUERY_HANDLE_HPP_
#define QUICKSTEP_QUERY_OPTIMIZER_QUERY_HANDLE_HPP_
+#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>
@@ -119,6 +120,39 @@ class QueryHandle {
query_result_relation_ = relation;
}
+ void setEntryTime() {
+ entry_time_ = std::chrono::steady_clock::now();
+ }
+
+ void setAdmissionTime() {
+ admission_time_ = std::chrono::steady_clock::now();
+ }
+
+ void setCompletionTime() {
+ completion_time_ = std::chrono::steady_clock::now();
+ }
+
+ const std::chrono::steady_clock::time_point& getEntryTime() const {
+ return entry_time_;
+ }
+
+ const std::chrono::steady_clock::time_point& getAdmissionTime() const {
+ return admission_time_;
+ }
+
+ const std::chrono::steady_clock::time_point& getCompletionTime() const {
+ return completion_time_;
+ }
+
+ const double getExecutionTimeMillis() const {
+ return std::chrono::duration<double, std::milli>(completion_time_ - admission_time_)
+ .count();
+ }
+
+ const double getWaitingTimeMillis() const {
+ return std::chrono::duration<double, std::milli>(admission_time_ - entry_time_).count();
+ }
+
private:
const std::size_t query_id_;
const std::uint64_t query_priority_;
@@ -134,6 +168,13 @@ class QueryHandle {
// and deleted by the Cli shell.
const CatalogRelation *query_result_relation_;
+ // Time when query entered the system.
+ std::chrono::steady_clock::time_point entry_time_;
+ // Time when query was admitted to the system.
+ std::chrono::steady_clock::time_point admission_time_;
+ // Time when query finished its execution.
+ std::chrono::steady_clock::time_point completion_time_;
+
DISALLOW_COPY_AND_ASSIGN(QueryHandle);
};