You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:10 UTC

[18/50] incubator-quickstep git commit: Refactored PolicyEnforcerBase::decrementNumQueuedWorkOrders.

Refactored PolicyEnforcerBase::decrementNumQueuedWorkOrders.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/76e37cab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/76e37cab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/76e37cab

Branch: refs/heads/quickstep_partition_parser_support
Commit: 76e37cabcf24ff4fec7e50d5bd2b590fb9fe93d2
Parents: 37a78cb
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Nov 18 12:20:12 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 20 19:29:11 2016 -0800

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  1 +
 query_execution/PolicyEnforcerBase.cpp        |  4 ++--
 query_execution/PolicyEnforcerBase.hpp        |  6 ++++--
 query_execution/PolicyEnforcerDistributed.hpp |  5 +++--
 query_execution/PolicyEnforcerSingleNode.cpp  |  4 ----
 query_execution/PolicyEnforcerSingleNode.hpp  |  7 +++++--
 query_execution/QueryExecutionMessages.proto  |  3 +++
 query_execution/QueryExecutionTypedefs.hpp    |  3 ++-
 query_execution/Shiftboss.cpp                 |  8 +++++++-
 query_execution/Shiftboss.hpp                 |  1 +
 query_execution/Worker.cpp                    | 14 ++++++++++++++
 query_execution/Worker.hpp                    |  9 +++++++++
 12 files changed, 51 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b5e07df..0f6c282 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -160,6 +160,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
                       glog
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_PolicyEnforcerBase
+                      quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionState
                       quickstep_queryexecution_QueryManagerBase
                       quickstep_queryexecution_QueryManagerSingleNode

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 4e8c782..6e922a8 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -50,7 +50,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
       // WorkOrder. It can be accessed in this scope.
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
-      decrementNumQueuedWorkOrders(proto.worker_thread_index());
+      decrementNumQueuedWorkOrders(proto);
 
       if (profile_individual_workorders_) {
         recordTimeForWorkOrder(proto);
@@ -69,7 +69,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
       // rebuild WorkOrder. It can be accessed in this scope.
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
-      decrementNumQueuedWorkOrders(proto.worker_thread_index());
+      decrementNumQueuedWorkOrders(proto);
 
       query_id = proto.query_id();
       DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 7009a0a..414367e 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -196,9 +196,11 @@ class PolicyEnforcerBase {
   /**
    * @brief Decrement the number of queued workorders for the given worker by 1.
    *
-   * @param worker_index The logical ID of the given worker.
+   * @param proto The completion message proto received after the WorkOrder
+   *        execution.
    **/
-  virtual void decrementNumQueuedWorkOrders(const std::size_t worker_index) = 0;
+  virtual void decrementNumQueuedWorkOrders(
+      const serialization::WorkOrderCompletionMessage &proto) = 0;
 
   DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerBase);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index bce3e0c..0bf249c 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -20,6 +20,7 @@
 #include <vector>
 
 #include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "utility/Macros.hpp"
 
@@ -89,8 +90,8 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
 
  private:
-  void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override {
-    shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index);
+  void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override {
+    shiftboss_directory_->decrementNumQueuedWorkOrders(proto.shiftboss_index());
   }
 
   void onQueryCompletion(QueryManagerBase *query_manager) override;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.cpp b/query_execution/PolicyEnforcerSingleNode.cpp
index 0bdb9b1..0aa2ca8 100644
--- a/query_execution/PolicyEnforcerSingleNode.cpp
+++ b/query_execution/PolicyEnforcerSingleNode.cpp
@@ -108,8 +108,4 @@ bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) {
   }
 }
 
-void PolicyEnforcerSingleNode::decrementNumQueuedWorkOrders(const std::size_t worker_index) {
-  worker_directory_->decrementNumQueuedWorkOrders(worker_index);
-}
-
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.hpp b/query_execution/PolicyEnforcerSingleNode.hpp
index 870df95..16c7a0d 100644
--- a/query_execution/PolicyEnforcerSingleNode.hpp
+++ b/query_execution/PolicyEnforcerSingleNode.hpp
@@ -25,6 +25,8 @@
 #include <vector>
 
 #include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/WorkerDirectory.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
@@ -36,7 +38,6 @@ namespace quickstep {
 class CatalogDatabaseLite;
 class QueryHandle;
 class StorageManager;
-class WorkerDirectory;
 class WorkerMessage;
 
 /** \addtogroup QueryExecution
@@ -89,7 +90,9 @@ class PolicyEnforcerSingleNode final : public PolicyEnforcerBase {
       std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
 
  private:
-  void decrementNumQueuedWorkOrders(const std::size_t worker_index) override;
+  void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override {
+    worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index());
+  }
 
   const tmb::client_id foreman_client_id_;
   const std::size_t num_numa_nodes_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 165a194..e6d741a 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -46,6 +46,9 @@ message WorkOrderCompletionMessage {
   // Epoch time in microseconds.
   optional uint64 execution_start_time = 5;
   optional uint64 execution_end_time = 6;
+
+  // Required in the distributed version.
+  optional uint64 shiftboss_index = 7;
 }
 
 message CatalogRelationNewBlockMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 22c0ae1..fb9a9d6 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -79,7 +79,8 @@ enum QueryExecutionMessageType : message_type_id {
 
 #ifdef QUICKSTEP_DISTRIBUTED
   kShiftbossRegistrationMessage,  // From Shiftboss to Foreman.
-  kShiftbossRegistrationResponseMessage,  // From Foreman to Shiftboss.
+  kShiftbossRegistrationResponseMessage,  // From Foreman to Shiftboss, or from
+                                          // Shiftboss to Worker.
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 30b2ae2..ed4bade 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -273,7 +273,7 @@ void Shiftboss::registerWithForeman() {
 }
 
 void Shiftboss::processShiftbossRegistrationResponseMessage() {
-  const AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+  AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
   const TaggedMessage &tagged_message = annotated_message.tagged_message;
   DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
 
@@ -286,6 +286,12 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
   CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
   shiftboss_index_ = proto.shiftboss_index();
+
+  // Forward this message to Workers regarding <shiftboss_index_>.
+  QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
+                                       worker_addresses_,
+                                       move(annotated_message.tagged_message),
+                                       bus_);
 }
 
 void Shiftboss::processQueryInitiateMessage(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 442e61e..6538d48 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -102,6 +102,7 @@ class Shiftboss : public Thread {
     bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
 
     // Message sent to Worker.
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
 
     // Forward the following message types from Foreman to Workers.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 0db17b4..31eb964 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -29,6 +29,7 @@
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "relational_operators/WorkOrder.hpp"
 #include "threading/ThreadIDBasedMap.hpp"
 #include "threading/ThreadUtil.hpp"
@@ -62,6 +63,15 @@ void Worker::run() {
         bus_->Receive(worker_client_id_, 0, true);
     const TaggedMessage &tagged_message = annotated_msg.tagged_message;
     switch (tagged_message.message_type()) {
+#ifdef QUICKSTEP_DISTRIBUTED
+      case kShiftbossRegistrationResponseMessage: {
+        serialization::ShiftbossRegistrationResponseMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        shiftboss_index_ = proto.shiftboss_index();
+        break;
+      }
+#endif  // QUICKSTEP_DISTRIBUTED
       case kWorkOrderMessage: {
         WorkOrderCompletionMessage proto;
         executeWorkOrderHelper(tagged_message, &proto);
@@ -136,6 +146,10 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
   proto->set_worker_thread_index(worker_thread_index_);
   proto->set_execution_start_time(execution_start_time);
   proto->set_execution_end_time(execution_end_time);
+
+#ifdef QUICKSTEP_DISTRIBUTED
+  proto->set_shiftboss_index(shiftboss_index_);
+#endif  // QUICKSTEP_DISTRIBUTED
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index fec2242..feb7c7d 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -24,6 +24,7 @@
 #include <cstdint>
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "threading/Thread.hpp"
 #include "utility/Macros.hpp"
 
@@ -75,6 +76,10 @@ class Worker : public Thread {
     bus_->RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);
     bus_->RegisterClientAsReceiver(worker_client_id_, kRebuildWorkOrderMessage);
     bus_->RegisterClientAsReceiver(worker_client_id_, kPoisonMessage);
+
+#ifdef QUICKSTEP_DISTRIBUTED
+    bus_->RegisterClientAsReceiver(worker_client_id_, kShiftbossRegistrationResponseMessage);
+#endif  // QUICKSTEP_DISTRIBUTED
   }
 
   ~Worker() override {}
@@ -132,6 +137,10 @@ class Worker : public Thread {
   const int cpu_id_;
   client_id worker_client_id_;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  std::size_t shiftboss_index_;
+#endif  // QUICKSTEP_DISTRIBUTED
+
   DISALLOW_COPY_AND_ASSIGN(Worker);
 };