You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/06/27 21:33:44 UTC
[09/18] incubator-quickstep git commit: Basic support to report
individual work order profiling results
Basic support to report individual work order profiling results
- A flag to enable work order profiling report generation.
- At the end of each query, a report is generated which includes worker
ID, its NUMA socket, the operator that produced the WorkOrder
and the execution time in microseconds for the latest query.
- The output is printed on stdout in CSV format as of now.
- As this is a rudimentary support for the functionality, there is a lot of
future work in this regards, which includes printing of CPU core information,
printing operator name, allowing user to specify a file where the output can
be written etc.
- Fixed a bug in constructing Foreman thread.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/07435a43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/07435a43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/07435a43
Branch: refs/heads/travis-grpc
Commit: 07435a430776c0b8b6381a4c0f0470250814c14b
Parents: c1476d1
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 16 14:03:34 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Mon Jun 20 09:56:52 2016 -0500
----------------------------------------------------------------------
cli/QuickstepCli.cpp | 12 +++++++-
query_execution/CMakeLists.txt | 2 ++
query_execution/Foreman.cpp | 26 +++++++++++++++--
query_execution/Foreman.hpp | 22 ++++++++++++++-
query_execution/PolicyEnforcer.cpp | 15 ++++++++++
query_execution/PolicyEnforcer.hpp | 50 +++++++++++++++++++++++++++++++--
6 files changed, 121 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 35bd16e..3f99130 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -137,6 +137,9 @@ static constexpr char kPathSeparator = '/';
static constexpr char kDefaultStoragePath[] = "qsstor/";
#endif
+DEFINE_bool(profile_and_report_workorder_perf, false,
+ "If true, Quickstep will record the exceution time of all the individual "
+ "normal work orders and report it at the end of query execution.");
DEFINE_int32(num_workers, 0, "Number of worker threads. If this value is "
"specified and is greater than 0, then this "
"user-supplied value is used. Else (i.e. the"
@@ -356,7 +359,9 @@ int main(int argc, char* argv[]) {
&bus,
query_processor->getDefaultDatabase(),
query_processor->getStorageManager(),
- num_numa_nodes_system);
+ -1, // Don't pin the Foreman thread.
+ num_numa_nodes_system,
+ quickstep::FLAGS_profile_and_report_workorder_perf);
// Start the worker threads.
for (Worker &worker : workers) {
@@ -461,6 +466,11 @@ int main(int argc, char* argv[]) {
printf("Time: %s ms\n",
quickstep::DoubleToStringWithSignificantDigits(
time_ms.count(), 3).c_str());
+ if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+ // TODO(harshad) - Allow user specified file instead of stdout.
+ foreman.printWorkOrderProfilingResults(query_handle->query_id(),
+ stdout);
+ }
} catch (const std::exception &e) {
fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
break;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 501166e..b031a44 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -1,5 +1,7 @@
# Copyright 2011-2015 Quickstep Technologies LLC.
# Copyright 2015-2016 Pivotal Software, Inc.
+# Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+# University of Wisconsin\u2014Madison.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 828834d..f9f2e7a 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -18,7 +18,9 @@
#include "query_execution/Foreman.hpp"
#include <cstddef>
+#include <cstdio>
#include <memory>
+#include <tuple>
#include <utility>
#include <vector>
@@ -54,7 +56,8 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
const int cpu_id,
- const size_t num_numa_nodes)
+ const size_t num_numa_nodes,
+ const bool profile_individual_workorders)
: ForemanLite(bus, cpu_id),
main_thread_client_id_(main_thread_client_id),
worker_directory_(DCHECK_NOTNULL(worker_directory)),
@@ -90,7 +93,8 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
catalog_database_,
storage_manager_,
worker_directory_,
- bus_));
+ bus_,
+ profile_individual_workorders));
}
void Foreman::run() {
@@ -229,4 +233,22 @@ void Foreman::sendWorkerMessage(const size_t worker_thread_index,
<< worker_directory_->getClientID(worker_thread_index);
}
+void Foreman::printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const {
+ const std::vector<
+ std::tuple<std::size_t, std::size_t, std::size_t>>
+ &recorded_times = policy_enforcer_->getProfilingResults(query_id);
+ fputs("Worker ID, NUMA Socket, Operator ID, Time (microseconds)\n", out);
+ for (auto workorder_entry : recorded_times) {
+ // Note: Index of the "worker thread index" in the tuple is 0.
+ const std::size_t worker_id = std::get<0>(workorder_entry);
+ fprintf(out,
+ "%lu, %d, %lu, %lu\n",
+ worker_id,
+ worker_directory_->getNUMANode(worker_id),
+ std::get<1>(workorder_entry),
+ std::get<2>(workorder_entry));
+ }
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index 94cb9fc..7be57e7 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -19,6 +19,7 @@
#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
#include <cstddef>
+#include <cstdio>
#include <memory>
#include <vector>
@@ -57,6 +58,8 @@ class Foreman final : public ForemanLite {
* @param storage_manager The StorageManager to use.
* @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
* @param num_numa_nodes The number of NUMA nodes in the system.
+ * @param profile_individual_workorders Whether every workorder's execution
+ * be profiled or not.
*
* @note If cpu_id is not specified, Foreman thread can be possibly moved
* around on different CPUs by the OS.
@@ -67,10 +70,27 @@ class Foreman final : public ForemanLite {
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
const int cpu_id = -1,
- const std::size_t num_numa_nodes = 1);
+ const std::size_t num_numa_nodes = 1,
+ const bool profile_individual_workorders = false);
~Foreman() override {}
+ /**
+ * @brief Print the results of profiling individual work orders for a given
+ * query.
+ *
+ * TODO(harshad) - Add the name of the operator to the output.
+ * TODO(harshad) - Add the CPU core ID of the operator to the output. This
+ * will require modifying the WorkerDirectory to remember worker affinities.
+ * Until then, the users can refer to the worker_affinities provided to the
+ * cli to infer the CPU core ID where a given worker is pinned.
+ *
+ * @param query_id The ID of the query for which the results are to be printed.
+ * @param out The file stream.
+ **/
+ void printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const;
+
protected:
void run() override;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 9f0502d..84aa86a 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -76,6 +76,9 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
query_id = proto.query_id();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
+ if (profile_individual_workorders_) {
+ recordTimeForWorkOrder(proto);
+ }
break;
}
case kRebuildWorkOrderCompleteMessage: {
@@ -197,4 +200,16 @@ bool PolicyEnforcer::admitQueries(
return true;
}
+void PolicyEnforcer::recordTimeForWorkOrder(
+ const serialization::NormalWorkOrderCompletionMessage &proto) {
+ const std::size_t query_id = proto.query_id();
+ if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
+ workorder_time_recorder_[query_id];
+ }
+ workorder_time_recorder_[query_id].emplace_back(
+ proto.worker_thread_index(),
+ proto.operator_index(),
+ proto.execution_time_in_microseconds());
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 9f87056..470ff2a 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -21,6 +21,7 @@
#include <cstddef>
#include <memory>
#include <queue>
+#include <tuple>
#include <unordered_map>
#include <vector>
@@ -62,13 +63,15 @@ class PolicyEnforcer {
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
WorkerDirectory *worker_directory,
- tmb::MessageBus *bus)
+ tmb::MessageBus *bus,
+ const bool profile_individual_workorders = false)
: foreman_client_id_(foreman_client_id),
num_numa_nodes_(num_numa_nodes),
catalog_database_(catalog_database),
storage_manager_(storage_manager),
worker_directory_(worker_directory),
- bus_(bus) {}
+ bus_(bus),
+ profile_individual_workorders_(profile_individual_workorders) {}
/**
* @brief Destructor.
@@ -143,9 +146,40 @@ class PolicyEnforcer {
return !(admitted_queries_.empty() && waiting_queries_.empty());
}
+ /**
+ * @brief Get the profiling results for individual work order execution for a
+ * given query.
+ *
+ * @note This function should only be called if profiling individual work
+ * orders option is enabled.
+ *
+ * @param query_id The ID of the query for which the profiling results are
+ * requested.
+ *
+ * @return A vector of tuples, each being a single profiling entry.
+ **/
+ inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
+ getProfilingResults(const std::size_t query_id) const {
+ DCHECK(profile_individual_workorders_);
+ DCHECK(workorder_time_recorder_.find(query_id) !=
+ workorder_time_recorder_.end());
+ return workorder_time_recorder_.at(query_id);
+ }
+
private:
static constexpr std::size_t kMaxConcurrentQueries = 1;
+ /**
+ * @brief Record the execution time for a finished WorkOrder.
+ *
+ * TODO(harshad) - Extend the functionality to rebuild work orders.
+ *
+ * @param proto The completion message proto sent after the WorkOrder
+ * execution.
+ **/
+ void recordTimeForWorkOrder(
+ const serialization::NormalWorkOrderCompletionMessage &proto);
+
const tmb::client_id foreman_client_id_;
const std::size_t num_numa_nodes_;
@@ -154,6 +188,7 @@ class PolicyEnforcer {
WorkerDirectory *worker_directory_;
tmb::MessageBus *bus_;
+ const bool profile_individual_workorders_;
// Key = query ID, value = QueryManager* for the key query.
std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
@@ -161,6 +196,17 @@ class PolicyEnforcer {
// The queries which haven't been admitted yet.
std::queue<QueryHandle*> waiting_queries_;
+ // Key = Query ID.
+ // Value = A tuple indicating a record of executing a work order.
+ // Within a tuple ...
+ // 1st element: Logical worker ID.
+ // 2nd element: Operator ID.
+ // 3rd element: Time in microseconds to execute the work order.
+ std::unordered_map<
+ std::size_t,
+ std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
+ workorder_time_recorder_;
+
DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer);
};