You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:11 UTC
[19/50] incubator-quickstep git commit: Refactored
printWorkOrderProfilingResults in Foreman.
Refactored printWorkOrderProfilingResults in Foreman.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7fb96f3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7fb96f3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7fb96f3d
Branch: refs/heads/quickstep_partition_parser_support
Commit: 7fb96f3da140f4631060132b58ded2fd38191167
Parents: 76e37ca
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Nov 18 22:07:27 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 20 19:41:16 2016 -0800
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 3 +++
query_execution/ForemanBase.hpp | 33 +++++++++++++++++++++++++++++
query_execution/ForemanDistributed.cpp | 11 ++++++----
query_execution/ForemanDistributed.hpp | 14 +-----------
query_execution/ForemanSingleNode.cpp | 17 ++++++++-------
query_execution/ForemanSingleNode.hpp | 29 +------------------------
query_execution/PolicyEnforcerBase.hpp | 18 ++++++++--------
7 files changed, 63 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 0f6c282..cf9d5b0 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -86,6 +86,7 @@ if (ENABLE_DISTRIBUTED)
endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ForemanBase
glog
+ quickstep_queryexecution_PolicyEnforcerBase
quickstep_threading_Thread
quickstep_utility_Macros
tmb)
@@ -98,6 +99,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_catalog_Catalog_proto
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanBase
+ quickstep_queryexecution_PolicyEnforcerBase
quickstep_queryexecution_PolicyEnforcerDistributed
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
@@ -113,6 +115,7 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
glog
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanBase
+ quickstep_queryexecution_PolicyEnforcerBase
quickstep_queryexecution_PolicyEnforcerSingleNode
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanBase.hpp b/query_execution/ForemanBase.hpp
index 951f34d..ee6c7ce 100644
--- a/query_execution/ForemanBase.hpp
+++ b/query_execution/ForemanBase.hpp
@@ -20,6 +20,11 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
#include "threading/Thread.hpp"
#include "utility/Macros.hpp"
@@ -30,6 +35,8 @@
namespace quickstep {
+struct WorkOrderTimeEntry;
+
/** \addtogroup QueryExecution
* @{
*/
@@ -59,6 +66,30 @@ class ForemanBase : public Thread {
~ForemanBase() override {}
/**
+ * @brief Print the results of profiling individual work orders for a given
+ * query.
+ *
+ * TODO(harshad) - Add the name of the operator to the output.
+ *
+ * @param query_id The ID of the query for which the results are to be printed.
+ * @param out The file stream.
+ **/
+ virtual void printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const = 0;
+
+ /**
+ * @brief Get the results of profiling individual work orders for a given
+ * query.
+ *
+ * @param query_id The ID of the query for which the results are to be printed.
+ * @return A vector of records, each being a single profiling entry.
+ **/
+ const std::vector<WorkOrderTimeEntry>& getWorkOrderProfilingResults(
+ const std::size_t query_id) const {
+ return policy_enforcer_->getProfilingResults(query_id);
+ }
+
+ /**
* @brief Get the TMB client ID of Foreman thread.
*
* @return TMB client ID of foreman thread.
@@ -77,6 +108,8 @@ class ForemanBase : public Thread {
// The ID of the CPU that the Foreman thread can optionally be pinned to.
const int cpu_id_;
+ std::unique_ptr<PolicyEnforcerBase> policy_enforcer_;
+
private:
DISALLOW_COPY_AND_ASSIGN(ForemanBase);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 56b319b..aa4db17 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/PolicyEnforcerDistributed.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
@@ -98,12 +99,12 @@ ForemanDistributed::ForemanDistributed(
bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
}
- policy_enforcer_.reset(new PolicyEnforcerDistributed(
+ policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
foreman_client_id_,
catalog_database_,
&shiftboss_directory_,
bus_,
- profile_individual_workorders));
+ profile_individual_workorders);
}
void ForemanDistributed::run() {
@@ -180,7 +181,8 @@ void ForemanDistributed::run() {
}
case kInitiateRebuildResponseMessage: {
// A unique case in the distributed version.
- policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
+ static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->
+ processInitiateRebuildResponseMessage(tagged_message);
break;
}
case kSaveQueryResultResponseMessage: {
@@ -228,7 +230,8 @@ void ForemanDistributed::run() {
if (canCollectNewMessages(message_type)) {
vector<unique_ptr<S::WorkOrderMessage>> new_messages;
- policy_enforcer_->getWorkOrderProtoMessages(&new_messages);
+ static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->
+ getWorkOrderProtoMessages(&new_messages);
dispatchWorkOrderMessages(new_messages);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index b42795c..7f2d2f9 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -24,7 +24,6 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/ForemanBase.hpp"
-#include "query_execution/PolicyEnforcerDistributed.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "utility/Macros.hpp"
@@ -68,17 +67,8 @@ class ForemanDistributed final : public ForemanBase {
~ForemanDistributed() override {}
- /**
- * @brief Print the results of profiling individual work orders for a given
- * query.
- *
- * TODO(harshad) - Add the name of the operator to the output.
- *
- * @param query_id The ID of the query for which the results are to be printed.
- * @param out The file stream.
- **/
void printWorkOrderProfilingResults(const std::size_t query_id,
- std::FILE *out) const;
+ std::FILE *out) const override;
protected:
void run() override;
@@ -120,8 +110,6 @@ class ForemanDistributed final : public ForemanBase {
CatalogDatabaseLite *catalog_database_;
- std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
-
// From a query id to a set of Shiftbosses that save query result.
std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 4661c37..dfdfb71 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -26,6 +26,7 @@
#include <vector>
#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/PolicyEnforcerSingleNode.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
@@ -92,14 +93,14 @@ ForemanSingleNode::ForemanSingleNode(
bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
}
- policy_enforcer_.reset(new PolicyEnforcerSingleNode(
+ policy_enforcer_ = std::make_unique<PolicyEnforcerSingleNode>(
foreman_client_id_,
num_numa_nodes,
catalog_database_,
storage_manager_,
worker_directory_,
bus_,
- profile_individual_workorders));
+ profile_individual_workorders);
}
void ForemanSingleNode::run() {
@@ -157,7 +158,8 @@ void ForemanSingleNode::run() {
if (canCollectNewMessages(message_type)) {
vector<unique_ptr<WorkerMessage>> new_messages;
- policy_enforcer_->getWorkerMessages(&new_messages);
+ static_cast<PolicyEnforcerSingleNode*>(policy_enforcer_.get())->
+ getWorkerMessages(&new_messages);
dispatchWorkerMessages(new_messages);
}
@@ -233,13 +235,12 @@ void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
-const std::vector<WorkOrderTimeEntry>& ForemanSingleNode
- ::getWorkOrderProfilingResults(const std::size_t query_id) const {
- return policy_enforcer_->getProfilingResults(query_id);
-}
-
void ForemanSingleNode::printWorkOrderProfilingResults(const std::size_t query_id,
std::FILE *out) const {
+ // TODO(harshad) - Add the CPU core ID of the operator to the output. This
+ // will require modifying the WorkerDirectory to remember worker affinities.
+ // Until then, the users can refer to the worker_affinities provided to the
+ // cli to infer the CPU core ID where a given worker is pinned.
const std::vector<WorkOrderTimeEntry> &recorded_times =
policy_enforcer_->getProfilingResults(query_id);
fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
index 5a368aa..4cc7a63 100644
--- a/query_execution/ForemanSingleNode.hpp
+++ b/query_execution/ForemanSingleNode.hpp
@@ -26,7 +26,6 @@
#include <vector>
#include "query_execution/ForemanBase.hpp"
-#include "query_execution/PolicyEnforcerSingleNode.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
@@ -78,32 +77,8 @@ class ForemanSingleNode final : public ForemanBase {
~ForemanSingleNode() override {}
-
- /**
- * @brief Get the results of profiling individual work orders for a given
- * query.
- *
- * @param query_id The ID of the query for which the results are to be printed.
- * @return A vector of records, each being a single profiling entry.
- **/
- const std::vector<WorkOrderTimeEntry>& getWorkOrderProfilingResults(
- const std::size_t query_id) const;
-
- /**
- * @brief Print the results of profiling individual work orders for a given
- * query.
- *
- * TODO(harshad) - Add the name of the operator to the output.
- * TODO(harshad) - Add the CPU core ID of the operator to the output. This
- * will require modifying the WorkerDirectory to remember worker affinities.
- * Until then, the users can refer to the worker_affinities provided to the
- * cli to infer the CPU core ID where a given worker is pinned.
- *
- * @param query_id The ID of the query for which the results are to be printed.
- * @param out The file stream.
- **/
void printWorkOrderProfilingResults(const std::size_t query_id,
- std::FILE *out) const;
+ std::FILE *out) const override;
protected:
void run() override;
@@ -142,8 +117,6 @@ class ForemanSingleNode final : public ForemanBase {
CatalogDatabaseLite *catalog_database_;
StorageManager *storage_manager_;
- std::unique_ptr<PolicyEnforcerSingleNode> policy_enforcer_;
-
DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 414367e..aa070b8 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -146,6 +146,15 @@ class PolicyEnforcerBase {
return workorder_time_recorder_.at(query_id);
}
+ /**
+ * @brief Admit a query to the system.
+ *
+ * @param query_handle The QueryHandle for the new query.
+ *
+ * @return Whether the query was admitted to the system.
+ **/
+ virtual bool admitQuery(QueryHandle *query_handle) = 0;
+
protected:
static constexpr std::size_t kMaxConcurrentQueries = 1;
@@ -185,15 +194,6 @@ class PolicyEnforcerBase {
private:
/**
- * @brief Admit a query to the system.
- *
- * @param query_handle The QueryHandle for the new query.
- *
- * @return Whether the query was admitted to the system.
- **/
- virtual bool admitQuery(QueryHandle *query_handle) = 0;
-
- /**
* @brief Decrement the number of queued workorders for the given worker by 1.
*
* @param proto The completion message proto received after the WorkOrder