You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/02 23:17:42 UTC

[06/20] incubator-quickstep git commit: Add individual query time printing

Add individual query time printing

- Query entry/admission and completion times are made part of the query
  handle class.
- Bug fix in PriorityPolicyEnforcer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2d2c2845
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2d2c2845
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2d2c2845

Branch: refs/heads/scheduler++
Commit: 2d2c284520e1a76d8ab21facfbb474fcb6b1bd65
Parents: c53cfc9
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 30 15:00:44 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jul 2 18:16:30 2016 -0500

----------------------------------------------------------------------
 cli/QuickstepCli.cpp                       |  5 +++
 query_execution/Learner.cpp                |  1 +
 query_execution/Learner.hpp                |  3 +-
 query_execution/PriorityPolicyEnforcer.cpp | 11 ++++---
 query_execution/PriorityPolicyEnforcer.hpp |  4 +++
 query_optimizer/QueryHandle.hpp            | 41 +++++++++++++++++++++++++
 6 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d2c2845/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 3010ccc..4906f7a 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -19,6 +19,7 @@
 
 /* A standalone command-line interface to QuickStep */
 
+#include <algorithm>
 #include <chrono>
 #include <cstddef>
 #include <cstdio>
@@ -530,6 +531,10 @@ int main(int argc, char* argv[]) {
               for (std::size_t i = 0; i < query_handles.size(); ++i) {
                 InputParserUtil::PrintAndDropOutputRelation(
                     query_handles[i], query_processor.get());
+                printf("Time: %s ms\n",
+                       quickstep::DoubleToStringWithSignificantDigits(
+                           query_handles[i]->getExecutionTimeMillis(), 3)
+                           .c_str());
               }
               query_processor->saveCatalog();
               if (quickstep::FLAGS_profile_and_report_workorder_perf) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d2c2845/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 9801f60..183d688 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -245,6 +245,7 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
     execution_stats_.erase(priority_level);
     current_probabilities_.erase(priority_level);
     probabilities_of_priority_levels_->removeObject(priority_level);
+    default_probabilities_.erase(priority_level);
     // NOTE(harshad) : Not using this cache as it gets confusing.
     // has_feedback_from_all_queries_.erase(priority_level);
     if (hasActiveQueries()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d2c2845/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index ef92db9..024dc9b 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -276,7 +276,7 @@ class Learner {
     CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
     if (!isPriorityLevelPresent(priority_level)) {
       current_probabilities_[priority_level].reset(new ProbabilityStore());
-      execution_stats_[priority_level];
+      execution_stats_.emplace(priority_level, std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>());
       if (static_cast<int>(priority_level) > highest_priority_level_) {
         highest_priority_level_ = priority_level;
       }
@@ -380,6 +380,7 @@ class Learner {
       const std::size_t priority_level) const {
     // NOTE(harshad) : Not using this cache as it gets confusing.
     // return has_feedback_from_all_queries_.at(priority_level);
+    DCHECK(isPriorityLevelPresent(priority_level));
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
         &stats_vector = execution_stats_.at(priority_level);
     for (std::size_t i = 0; i < stats_vector.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d2c2845/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 0a15094..93908a9 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -43,7 +43,6 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
               " the workers.");
 
 bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
-  Learner learner;
   if (admitted_queries_.size() < kMaxConcurrentQueries) {
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();
@@ -52,9 +51,11 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
       admitted_queries_[query_id].reset(
           new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
                            catalog_database_, storage_manager_, bus_));
-      LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+      LOG(INFO) << "Admitted query with ID: " << query_handle->query_id() << " priority: " << query_handle->query_priority();
       priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
       learner_->addQuery(*query_handle);
+      query_handle->setAdmissionTime();
+      query_id_to_handle_[query_handle->query_id()] = query_handle;
       return true;
     } else {
       LOG(ERROR) << "Query with the same ID " << query_id << " exists";
@@ -63,6 +64,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
   } else {
     // This query will have to wait.
     LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
+    query_id_to_handle_[query_handle->query_id()] = query_handle;
     waiting_queries_.push(query_handle);
     return false;
   }
@@ -224,10 +226,11 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
     // No more queries for the given priority level. Remove the entry.
     priority_query_ids_.erase(query_priority_unsigned);
   }
+  query_id_to_handle_[query_id]->setCompletionTime();
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
-  LOG(INFO) << "Query " << query_id << " removed. has queries? " << hasQueries();
-  // Admit waiting queries, if any.
+  // TODO(harshad) - Admit waiting queries, if any.
+  LOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
 }
 
 bool PriorityPolicyEnforcer::admitQueries(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d2c2845/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index eafb099..1f13a10 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -218,6 +218,10 @@ class PriorityPolicyEnforcer {
   // The queries which haven't been admitted yet.
   std::queue<QueryHandle*> waiting_queries_;
 
+  // Key = query ID, value = a pointer to the QueryHandle.
+  // Note - This map has entries for active and waiting queries only.
+  std::unordered_map<std::size_t, QueryHandle*> query_id_to_handle_;
+
   // Key = Query ID.
   // Value = A tuple indicating a record of executing a work order.
   // Within a tuple ...

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d2c2845/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 5f3649a..04f672e 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -17,6 +17,7 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_QUERY_HANDLE_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_QUERY_HANDLE_HPP_
 
+#include <chrono>
 #include <cstddef>
 #include <cstdint>
 #include <memory>
@@ -119,6 +120,39 @@ class QueryHandle {
     query_result_relation_ = relation;
   }
 
+  void setEntryTime() {
+    entry_time_ = std::chrono::steady_clock::now();
+  }
+
+  void setAdmissionTime() {
+    admission_time_ = std::chrono::steady_clock::now();
+  }
+
+  void setCompletionTime() {
+    completion_time_ = std::chrono::steady_clock::now();
+  }
+
+  const std::chrono::steady_clock::time_point& getEntryTime() const {
+    return entry_time_;
+  }
+
+  const std::chrono::steady_clock::time_point& getAdmissionTime() const {
+    return admission_time_;
+  }
+
+  const std::chrono::steady_clock::time_point& getCompletionTime() const {
+    return completion_time_;
+  }
+
+  const double getExecutionTimeMillis() const {
+    return std::chrono::duration<double, std::milli>(completion_time_ - admission_time_)
+               .count();
+  }
+
+  const double getWaitingTimeMillis() const {
+    return std::chrono::duration<double, std::milli>(admission_time_ - entry_time_).count();
+  }
+
  private:
   const std::size_t query_id_;
   const std::uint64_t query_priority_;
@@ -134,6 +168,13 @@ class QueryHandle {
   //             and deleted by the Cli shell.
   const CatalogRelation *query_result_relation_;
 
+  // Time when query entered the system.
+  std::chrono::steady_clock::time_point entry_time_;
+  // Time when query was admitted to the system.
+  std::chrono::steady_clock::time_point admission_time_;
+  // Time when query finished its execution.
+  std::chrono::steady_clock::time_point completion_time_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryHandle);
 };