You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:11 UTC

[19/50] incubator-quickstep git commit: Refactored printWorkOrderProfilingResults in Foreman.

Refactored printWorkOrderProfilingResults in Foreman.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7fb96f3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7fb96f3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7fb96f3d

Branch: refs/heads/quickstep_partition_parser_support
Commit: 7fb96f3da140f4631060132b58ded2fd38191167
Parents: 76e37ca
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Nov 18 22:07:27 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 20 19:41:16 2016 -0800

----------------------------------------------------------------------
 query_execution/CMakeLists.txt         |  3 +++
 query_execution/ForemanBase.hpp        | 33 +++++++++++++++++++++++++++++
 query_execution/ForemanDistributed.cpp | 11 ++++++----
 query_execution/ForemanDistributed.hpp | 14 +-----------
 query_execution/ForemanSingleNode.cpp  | 17 ++++++++-------
 query_execution/ForemanSingleNode.hpp  | 29 +------------------------
 query_execution/PolicyEnforcerBase.hpp | 18 ++++++++--------
 7 files changed, 63 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 0f6c282..cf9d5b0 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -86,6 +86,7 @@ if (ENABLE_DISTRIBUTED)
 endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_ForemanBase
                       glog
+                      quickstep_queryexecution_PolicyEnforcerBase
                       quickstep_threading_Thread
                       quickstep_utility_Macros
                       tmb)
@@ -98,6 +99,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_catalog_Catalog_proto
                         quickstep_queryexecution_AdmitRequestMessage
                         quickstep_queryexecution_ForemanBase
+                        quickstep_queryexecution_PolicyEnforcerBase
                         quickstep_queryexecution_PolicyEnforcerDistributed
                         quickstep_queryexecution_QueryExecutionMessages_proto
                         quickstep_queryexecution_QueryExecutionTypedefs
@@ -113,6 +115,7 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanBase
+                      quickstep_queryexecution_PolicyEnforcerBase
                       quickstep_queryexecution_PolicyEnforcerSingleNode
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanBase.hpp b/query_execution/ForemanBase.hpp
index 951f34d..ee6c7ce 100644
--- a/query_execution/ForemanBase.hpp
+++ b/query_execution/ForemanBase.hpp
@@ -20,6 +20,11 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_
 
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
 #include "threading/Thread.hpp"
 #include "utility/Macros.hpp"
 
@@ -30,6 +35,8 @@
 
 namespace quickstep {
 
+struct WorkOrderTimeEntry;
+
 /** \addtogroup QueryExecution
  *  @{
  */
@@ -59,6 +66,30 @@ class ForemanBase : public Thread {
   ~ForemanBase() override {}
 
   /**
+   * @brief Print the results of profiling individual work orders for a given
+   *        query.
+   *
+   * TODO(harshad) - Add the name of the operator to the output.
+   *
+   * @param query_id The ID of the query for which the results are to be printed.
+   * @param out The file stream.
+   **/
+  virtual void printWorkOrderProfilingResults(const std::size_t query_id,
+                                              std::FILE *out) const = 0;
+
+  /**
+   * @brief Get the results of profiling individual work orders for a given
+   *        query.
+   *
+   * @param query_id The ID of the query for which the results are to be printed.
+   * @return A vector of records, each being a single profiling entry.
+   **/
+  const std::vector<WorkOrderTimeEntry>& getWorkOrderProfilingResults(
+      const std::size_t query_id) const {
+    return policy_enforcer_->getProfilingResults(query_id);
+  }
+
+  /**
    * @brief Get the TMB client ID of Foreman thread.
    *
    * @return TMB client ID of foreman thread.
@@ -77,6 +108,8 @@ class ForemanBase : public Thread {
   // The ID of the CPU that the Foreman thread can optionally be pinned to.
   const int cpu_id_;
 
+  std::unique_ptr<PolicyEnforcerBase> policy_enforcer_;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(ForemanBase);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 56b319b..aa4db17 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/PolicyEnforcerDistributed.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
@@ -98,12 +99,12 @@ ForemanDistributed::ForemanDistributed(
     bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
   }
 
-  policy_enforcer_.reset(new PolicyEnforcerDistributed(
+  policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
       foreman_client_id_,
       catalog_database_,
       &shiftboss_directory_,
       bus_,
-      profile_individual_workorders));
+      profile_individual_workorders);
 }
 
 void ForemanDistributed::run() {
@@ -180,7 +181,8 @@ void ForemanDistributed::run() {
       }
       case kInitiateRebuildResponseMessage: {
         // A unique case in the distributed version.
-        policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
+        static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->
+            processInitiateRebuildResponseMessage(tagged_message);
         break;
       }
       case kSaveQueryResultResponseMessage: {
@@ -228,7 +230,8 @@ void ForemanDistributed::run() {
 
     if (canCollectNewMessages(message_type)) {
       vector<unique_ptr<S::WorkOrderMessage>> new_messages;
-      policy_enforcer_->getWorkOrderProtoMessages(&new_messages);
+      static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->
+          getWorkOrderProtoMessages(&new_messages);
       dispatchWorkOrderMessages(new_messages);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index b42795c..7f2d2f9 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -24,7 +24,6 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/ForemanBase.hpp"
-#include "query_execution/PolicyEnforcerDistributed.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "utility/Macros.hpp"
 
@@ -68,17 +67,8 @@ class ForemanDistributed final : public ForemanBase {
 
   ~ForemanDistributed() override {}
 
-  /**
-   * @brief Print the results of profiling individual work orders for a given
-   *        query.
-   *
-   * TODO(harshad) - Add the name of the operator to the output.
-   *
-   * @param query_id The ID of the query for which the results are to be printed.
-   * @param out The file stream.
-   **/
   void printWorkOrderProfilingResults(const std::size_t query_id,
-                                      std::FILE *out) const;
+                                      std::FILE *out) const override;
 
  protected:
   void run() override;
@@ -120,8 +110,6 @@ class ForemanDistributed final : public ForemanBase {
 
   CatalogDatabaseLite *catalog_database_;
 
-  std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
-
   // From a query id to a set of Shiftbosses that save query result.
   std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 4661c37..dfdfb71 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/PolicyEnforcerSingleNode.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
@@ -92,14 +93,14 @@ ForemanSingleNode::ForemanSingleNode(
     bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
   }
 
-  policy_enforcer_.reset(new PolicyEnforcerSingleNode(
+  policy_enforcer_ = std::make_unique<PolicyEnforcerSingleNode>(
       foreman_client_id_,
       num_numa_nodes,
       catalog_database_,
       storage_manager_,
       worker_directory_,
       bus_,
-      profile_individual_workorders));
+      profile_individual_workorders);
 }
 
 void ForemanSingleNode::run() {
@@ -157,7 +158,8 @@ void ForemanSingleNode::run() {
 
     if (canCollectNewMessages(message_type)) {
       vector<unique_ptr<WorkerMessage>> new_messages;
-      policy_enforcer_->getWorkerMessages(&new_messages);
+      static_cast<PolicyEnforcerSingleNode*>(policy_enforcer_.get())->
+          getWorkerMessages(&new_messages);
       dispatchWorkerMessages(new_messages);
     }
 
@@ -233,13 +235,12 @@ void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
   CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
-const std::vector<WorkOrderTimeEntry>& ForemanSingleNode
-    ::getWorkOrderProfilingResults(const std::size_t query_id) const {
-  return policy_enforcer_->getProfilingResults(query_id);
-}
-
 void ForemanSingleNode::printWorkOrderProfilingResults(const std::size_t query_id,
                                                        std::FILE *out) const {
+  // TODO(harshad) - Add the CPU core ID of the operator to the output. This
+  // will require modifying the WorkerDirectory to remember worker affinities.
+  // Until then, the users can refer to the worker_affinities provided to the
+  // cli to infer the CPU core ID where a given worker is pinned.
   const std::vector<WorkOrderTimeEntry> &recorded_times =
       policy_enforcer_->getProfilingResults(query_id);
   fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
index 5a368aa..4cc7a63 100644
--- a/query_execution/ForemanSingleNode.hpp
+++ b/query_execution/ForemanSingleNode.hpp
@@ -26,7 +26,6 @@
 #include <vector>
 
 #include "query_execution/ForemanBase.hpp"
-#include "query_execution/PolicyEnforcerSingleNode.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
@@ -78,32 +77,8 @@ class ForemanSingleNode final : public ForemanBase {
 
   ~ForemanSingleNode() override {}
 
-
-  /**
-   * @brief Get the results of profiling individual work orders for a given
-   *        query.
-   *
-   * @param query_id The ID of the query for which the results are to be printed.
-   * @return A vector of records, each being a single profiling entry.
-   **/
-  const std::vector<WorkOrderTimeEntry>& getWorkOrderProfilingResults(
-      const std::size_t query_id) const;
-
-  /**
-   * @brief Print the results of profiling individual work orders for a given
-   *        query.
-   *
-   * TODO(harshad) - Add the name of the operator to the output.
-   * TODO(harshad) - Add the CPU core ID of the operator to the output. This
-   * will require modifying the WorkerDirectory to remember worker affinities.
-   * Until then, the users can refer to the worker_affinities provided to the
-   * cli to infer the CPU core ID where a given worker is pinned.
-   *
-   * @param query_id The ID of the query for which the results are to be printed.
-   * @param out The file stream.
-   **/
   void printWorkOrderProfilingResults(const std::size_t query_id,
-                                      std::FILE *out) const;
+                                      std::FILE *out) const override;
 
  protected:
   void run() override;
@@ -142,8 +117,6 @@ class ForemanSingleNode final : public ForemanBase {
   CatalogDatabaseLite *catalog_database_;
   StorageManager *storage_manager_;
 
-  std::unique_ptr<PolicyEnforcerSingleNode> policy_enforcer_;
-
   DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7fb96f3d/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 414367e..aa070b8 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -146,6 +146,15 @@ class PolicyEnforcerBase {
     return workorder_time_recorder_.at(query_id);
   }
 
+  /**
+   * @brief Admit a query to the system.
+   *
+   * @param query_handle The QueryHandle for the new query.
+   *
+   * @return Whether the query was admitted to the system.
+   **/
+  virtual bool admitQuery(QueryHandle *query_handle) = 0;
+
  protected:
   static constexpr std::size_t kMaxConcurrentQueries = 1;
 
@@ -185,15 +194,6 @@ class PolicyEnforcerBase {
 
  private:
   /**
-   * @brief Admit a query to the system.
-   *
-   * @param query_handle The QueryHandle for the new query.
-   *
-   * @return Whether the query was admitted to the system.
-   **/
-  virtual bool admitQuery(QueryHandle *query_handle) = 0;
-
-  /**
    * @brief Decrement the number of queued workorders for the given worker by 1.
    *
    * @param proto The completion message proto received after the WorkOrder