You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ha...@apache.org on 2016/06/23 20:16:35 UTC

[09/17] incubator-quickstep git commit: Basic support to report individual work order profiling results

Basic support to report individual work order profiling results

- A flag to enable work order profiling report generation.
- At the end of each query, a report is generated which includes worker
  ID, its NUMA socket, the operator that produced the WorkOrder
  and the execution time in microseconds for the latest query.
- The output is printed on stdout in CSV format as of now.
- As this is a rudimentary support for the functionality, there is a lot of
  future work in this regards, which includes printing of CPU core information,
  printing operator name, allowing user to specify a file where the output can
  be written etc.
- Fixed a bug in constructing Foreman thread.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/07435a43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/07435a43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/07435a43

Branch: refs/heads/decimal-type
Commit: 07435a430776c0b8b6381a4c0f0470250814c14b
Parents: c1476d1
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 16 14:03:34 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Mon Jun 20 09:56:52 2016 -0500

----------------------------------------------------------------------
 cli/QuickstepCli.cpp               | 12 +++++++-
 query_execution/CMakeLists.txt     |  2 ++
 query_execution/Foreman.cpp        | 26 +++++++++++++++--
 query_execution/Foreman.hpp        | 22 ++++++++++++++-
 query_execution/PolicyEnforcer.cpp | 15 ++++++++++
 query_execution/PolicyEnforcer.hpp | 50 +++++++++++++++++++++++++++++++--
 6 files changed, 121 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 35bd16e..3f99130 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -137,6 +137,9 @@ static constexpr char kPathSeparator = '/';
 static constexpr char kDefaultStoragePath[] = "qsstor/";
 #endif
 
+DEFINE_bool(profile_and_report_workorder_perf, false,
+    "If true, Quickstep will record the exceution time of all the individual "
+    "normal work orders and report it at the end of query execution.");
 DEFINE_int32(num_workers, 0, "Number of worker threads. If this value is "
                              "specified and is greater than 0, then this "
                              "user-supplied value is used. Else (i.e. the"
@@ -356,7 +359,9 @@ int main(int argc, char* argv[]) {
                   &bus,
                   query_processor->getDefaultDatabase(),
                   query_processor->getStorageManager(),
-                  num_numa_nodes_system);
+                  -1,  // Don't pin the Foreman thread.
+                  num_numa_nodes_system,
+                  quickstep::FLAGS_profile_and_report_workorder_perf);
 
   // Start the worker threads.
   for (Worker &worker : workers) {
@@ -461,6 +466,11 @@ int main(int argc, char* argv[]) {
           printf("Time: %s ms\n",
                  quickstep::DoubleToStringWithSignificantDigits(
                      time_ms.count(), 3).c_str());
+          if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+            // TODO(harshad) - Allow user specified file instead of stdout.
+            foreman.printWorkOrderProfilingResults(query_handle->query_id(),
+                                                   stdout);
+          }
         } catch (const std::exception &e) {
           fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
           break;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 501166e..b031a44 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015-2016 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department, 
+#     University of Wisconsin\u2014Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 828834d..f9f2e7a 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -18,7 +18,9 @@
 #include "query_execution/Foreman.hpp"
 
 #include <cstddef>
+#include <cstdio>
 #include <memory>
+#include <tuple>
 #include <utility>
 #include <vector>
 
@@ -54,7 +56,8 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
                  CatalogDatabaseLite *catalog_database,
                  StorageManager *storage_manager,
                  const int cpu_id,
-                 const size_t num_numa_nodes)
+                 const size_t num_numa_nodes,
+                 const bool profile_individual_workorders)
     : ForemanLite(bus, cpu_id),
       main_thread_client_id_(main_thread_client_id),
       worker_directory_(DCHECK_NOTNULL(worker_directory)),
@@ -90,7 +93,8 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
       catalog_database_,
       storage_manager_,
       worker_directory_,
-      bus_));
+      bus_,
+      profile_individual_workorders));
 }
 
 void Foreman::run() {
@@ -229,4 +233,22 @@ void Foreman::sendWorkerMessage(const size_t worker_thread_index,
       << worker_directory_->getClientID(worker_thread_index);
 }
 
+void Foreman::printWorkOrderProfilingResults(const std::size_t query_id,
+                                             std::FILE *out) const {
+  const std::vector<
+      std::tuple<std::size_t, std::size_t, std::size_t>>
+      &recorded_times = policy_enforcer_->getProfilingResults(query_id);
+  fputs("Worker ID, NUMA Socket, Operator ID, Time (microseconds)\n", out);
+  for (auto workorder_entry : recorded_times) {
+    // Note: Index of the "worker thread index" in the tuple is 0.
+    const std::size_t worker_id = std::get<0>(workorder_entry);
+    fprintf(out,
+            "%lu, %d, %lu, %lu\n",
+            worker_id,
+            worker_directory_->getNUMANode(worker_id),
+            std::get<1>(workorder_entry),
+            std::get<2>(workorder_entry));
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index 94cb9fc..7be57e7 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -19,6 +19,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
 
 #include <cstddef>
+#include <cstdio>
 #include <memory>
 #include <vector>
 
@@ -57,6 +58,8 @@ class Foreman final : public ForemanLite {
    * @param storage_manager The StorageManager to use.
    * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
    * @param num_numa_nodes The number of NUMA nodes in the system.
+   * @param profile_individual_workorders Whether every workorder's execution
+   *        be profiled or not.
    *
    * @note If cpu_id is not specified, Foreman thread can be possibly moved
    *       around on different CPUs by the OS.
@@ -67,10 +70,27 @@ class Foreman final : public ForemanLite {
           CatalogDatabaseLite *catalog_database,
           StorageManager *storage_manager,
           const int cpu_id = -1,
-          const std::size_t num_numa_nodes = 1);
+          const std::size_t num_numa_nodes = 1,
+          const bool profile_individual_workorders = false);
 
   ~Foreman() override {}
 
+  /**
+   * @brief Print the results of profiling individual work orders for a given
+   *        query.
+   *
+   * TODO(harshad) - Add the name of the operator to the output.
+   * TODO(harshad) - Add the CPU core ID of the operator to the output. This
+   * will require modifying the WorkerDirectory to remember worker affinities.
+   * Until then, the users can refer to the worker_affinities provided to the
+   * cli to infer the CPU core ID where a given worker is pinned.
+   *
+   * @param query_id The ID of the query for which the results are to be printed.
+   * @param out The file stream.
+   **/
+  void printWorkOrderProfilingResults(const std::size_t query_id,
+                                      std::FILE *out) const;
+
  protected:
   void run() override;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 9f0502d..84aa86a 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -76,6 +76,9 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
       query_id = proto.query_id();
       worker_directory_->decrementNumQueuedWorkOrders(
           proto.worker_thread_index());
+      if (profile_individual_workorders_) {
+        recordTimeForWorkOrder(proto);
+      }
       break;
     }
     case kRebuildWorkOrderCompleteMessage: {
@@ -197,4 +200,16 @@ bool PolicyEnforcer::admitQueries(
   return true;
 }
 
+void PolicyEnforcer::recordTimeForWorkOrder(
+    const serialization::NormalWorkOrderCompletionMessage &proto) {
+  const std::size_t query_id = proto.query_id();
+  if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
+    workorder_time_recorder_[query_id];
+  }
+  workorder_time_recorder_[query_id].emplace_back(
+      proto.worker_thread_index(),
+      proto.operator_index(),
+      proto.execution_time_in_microseconds());
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 9f87056..470ff2a 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -21,6 +21,7 @@
 #include <cstddef>
 #include <memory>
 #include <queue>
+#include <tuple>
 #include <unordered_map>
 #include <vector>
 
@@ -62,13 +63,15 @@ class PolicyEnforcer {
                  CatalogDatabaseLite *catalog_database,
                  StorageManager *storage_manager,
                  WorkerDirectory *worker_directory,
-                 tmb::MessageBus *bus)
+                 tmb::MessageBus *bus,
+                 const bool profile_individual_workorders = false)
       : foreman_client_id_(foreman_client_id),
         num_numa_nodes_(num_numa_nodes),
         catalog_database_(catalog_database),
         storage_manager_(storage_manager),
         worker_directory_(worker_directory),
-        bus_(bus) {}
+        bus_(bus),
+        profile_individual_workorders_(profile_individual_workorders) {}
 
   /**
    * @brief Destructor.
@@ -143,9 +146,40 @@ class PolicyEnforcer {
     return !(admitted_queries_.empty() && waiting_queries_.empty());
   }
 
+  /**
+   * @brief Get the profiling results for individual work order execution for a
+   *        given query.
+   *
+   * @note This function should only be called if profiling individual work
+   *       orders option is enabled.
+   *
+   * @param query_id The ID of the query for which the profiling results are
+   *        requested.
+   *
+   * @return A vector of tuples, each being a single profiling entry.
+   **/
+  inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
+      getProfilingResults(const std::size_t query_id) const {
+    DCHECK(profile_individual_workorders_);
+    DCHECK(workorder_time_recorder_.find(query_id) !=
+           workorder_time_recorder_.end());
+    return workorder_time_recorder_.at(query_id);
+  }
+
  private:
   static constexpr std::size_t kMaxConcurrentQueries = 1;
 
+  /**
+   * @brief Record the execution time for a finished WorkOrder.
+   *
+   * TODO(harshad) - Extend the functionality to rebuild work orders.
+   *
+   * @param proto The completion message proto sent after the WorkOrder
+   *        execution.
+   **/
+  void recordTimeForWorkOrder(
+      const serialization::NormalWorkOrderCompletionMessage &proto);
+
   const tmb::client_id foreman_client_id_;
   const std::size_t num_numa_nodes_;
 
@@ -154,6 +188,7 @@ class PolicyEnforcer {
   WorkerDirectory *worker_directory_;
 
   tmb::MessageBus *bus_;
+  const bool profile_individual_workorders_;
 
   // Key = query ID, value = QueryManager* for the key query.
   std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
@@ -161,6 +196,17 @@ class PolicyEnforcer {
   // The queries which haven't been admitted yet.
   std::queue<QueryHandle*> waiting_queries_;
 
+  // Key = Query ID.
+  // Value = A tuple indicating a record of executing a work order.
+  // Within a tuple ...
+  // 1st element: Logical worker ID.
+  // 2nd element: Operator ID.
+  // 3rd element: Time in microseconds to execute the work order.
+  std::unordered_map<
+      std::size_t,
+      std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
+      workorder_time_recorder_;
+
   DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
 };