You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/01 18:02:31 UTC

incubator-quickstep git commit: Function to log work order execution statistics. - Change log severity levels. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/scheduler++ f9a0696f0 -> 4c3169e45 (forced update)


Function to log work order execution statistics.
- Change log severity levels.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4c3169e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4c3169e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4c3169e4

Branch: refs/heads/scheduler++
Commit: 4c3169e45cbcd4b1206274315e8b0dbfbb46cd10
Parents: efd88a7
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jul 1 11:38:18 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Fri Jul 1 13:01:44 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.cpp                  | 26 +++++++++++++++++++----
 query_execution/Learner.hpp                  |  3 +++
 query_execution/PriorityPolicyEnforcer.cpp   | 10 ++++-----
 query_execution/QueryExecutionMessages.proto |  6 ++++++
 query_execution/Worker.cpp                   |  2 ++
 5 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 3c6f42b..e8dc4b0 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -61,16 +61,17 @@ void Learner::addCompletionFeedback(
   }
   updateProbabilitiesForQueriesInPriorityLevel(priority_level, query_id);
   updateProbabilitiesOfAllPriorityLevels();
-  printProbabilitiesForPriorityLevel(priority_level);
+  // printProbabilitiesForPriorityLevel(priority_level);
+  printWorkOrderDetails(workorder_completion_proto);
 }
 
 void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     const std::size_t priority_level, const std::size_t query_id) {
   DCHECK(isPriorityLevelPresent(priority_level));
   if (execution_stats_[priority_level].empty()) {
-    LOG(INFO) << "Updating probabilities for query ID: " << query_id
-              << " and priority level: " << priority_level
-              << " that has no queries";
+    LOG(WARNING) << "Updating probabilities for query ID: " << query_id
+                 << " and priority level: " << priority_level
+                 << " that has no queries";
     return;
   } else if (execution_stats_[priority_level].size() == 1u) {
     DCHECK(current_probabilities_[priority_level] != nullptr);
@@ -286,4 +287,21 @@ void Learner::printProbabilitiesForPriorityLevel(const std::size_t priority_leve
   }
 }
 
+void Learner::printWorkOrderDetails(
+    const serialization::NormalWorkOrderCompletionMessage &proto) const {
+  // Format: Query ID, Operator ID, Worker ID, Time in micros, WO execution end timestamp.
+  std::string result = "";
+  result.reserve(30);
+  result += std::to_string(proto.query_id());  // 2 chars
+  result += "|";
+  result += std::to_string(proto.operator_index());  // 2 chars
+  result += "|";
+  result += std::to_string(proto.worker_thread_index());  // 2 chars
+  result += "|";
+  result += std::to_string(proto.execution_time_in_microseconds());  // 5 chars
+  result += "|";
+  result += std::to_string(proto.execution_end_timestamp());  // 12 chars
+  LOG(INFO) << result;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 6cea325..8654544 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -472,6 +472,9 @@ class Learner {
 
   void printProbabilitiesForPriorityLevel(const std::size_t priority_level);
 
+  void printWorkOrderDetails(
+      const serialization::NormalWorkOrderCompletionMessage &proto) const;
+
   // Key = Priority level, value = A vector of pairs.
   // Each pair:
   // 1st element: Query ID.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 93908a9..63f0383 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -51,7 +51,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
       admitted_queries_[query_id].reset(
           new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
                            catalog_database_, storage_manager_, bus_));
-      LOG(INFO) << "Admitted query with ID: " << query_handle->query_id() << " priority: " << query_handle->query_priority();
+      DLOG(INFO) << "Admitted query with ID: " << query_handle->query_id() << " priority: " << query_handle->query_priority();
       priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
       learner_->addQuery(*query_handle);
       query_handle->setAdmissionTime();
@@ -63,7 +63,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
     }
   } else {
     // This query will have to wait.
-    LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
+    DLOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
     query_id_to_handle_[query_handle->query_id()] = query_handle;
     waiting_queries_.push(query_handle);
     return false;
@@ -178,11 +178,10 @@ void PriorityPolicyEnforcer::getWorkerMessages(
     while (checked_priority_levels.size() < priority_query_ids_.size()) {
       const int chosen_priority_level = learner_->pickRandomPriorityLevel();
       if (chosen_priority_level == kInvalidPriorityLevel) {
-        LOG(INFO) << "No valid priority level chosen";
+        DLOG(INFO) << "No valid priority level chosen";
         break;
       } else if (checked_priority_levels.find(static_cast<std::size_t>(
                      chosen_priority_level)) != checked_priority_levels.end()) {
-        DLOG(INFO) << "The chosen priority level " << chosen_priority_level << " was checked already";
         continue;
       } else {
         WorkerMessage *next_worker_message =
@@ -230,7 +229,7 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
   // TODO(harshad) - Admit waiting queries, if any.
-  LOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
+  DLOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
 }
 
 bool PriorityPolicyEnforcer::admitQueries(
@@ -279,7 +278,6 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
           std::unique_ptr<WorkerMessage> next_worker_message(
               chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
           if (next_worker_message != nullptr) {
-            // LOG(INFO) << "Selecting a work order from query " << qid << " instead";
             return next_worker_message.release();
           } else {
             // This query doesn't have any WorkerMessage right now. Mark as checked.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 65a8946..db35ecc 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -39,6 +39,8 @@ message NormalWorkOrderCompletionMessage {
   required uint64 worker_thread_index = 2;
   required uint64 query_id = 3;
   optional uint64 execution_time_in_microseconds = 4;
+  // Indicates time elapsed in microseconds since epoch.  
+  optional uint64 execution_end_timestamp = 5;  
 }
 
 // A message sent upon completion of a rebuild WorkOrder execution.
@@ -47,6 +49,10 @@ message RebuildWorkOrderCompletionMessage {
   required uint64 worker_thread_index = 2;
   required uint64 query_id = 3;
   optional uint64 execution_time_in_microseconds = 4;
+  optional uint64 execution_end_timestamp = 5;  // Indicates time elapsed in 
+                                                // microseconds since epoch, 
+                                                // when the work order execution
+                                                // got over. 
 }
 
 message CatalogRelationNewBlockMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4c3169e4/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index ae889c7..5ad80ed 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -123,11 +123,13 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
   const uint64_t execution_time_microseconds =
       std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
           .count();
+  const uint64_t execution_end_timestamp = std::chrono::duration_cast<std::chrono::microseconds>(end.time_since_epoch()).count();
   // Construct the proto message.
   proto->set_operator_index(worker_message.getRelationalOpIndex());
   proto->set_query_id(query_id_for_workorder);
   proto->set_worker_thread_index(worker_thread_index_);
   proto->set_execution_time_in_microseconds(execution_time_microseconds);
+  proto->set_execution_end_timestamp(execution_end_timestamp);
 }
 
 }  // namespace quickstep