You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/12/18 01:34:40 UTC
[12/42] incubator-quickstep git commit: Refactored
PolicyEnforcerBase::decrementNumQueuedWorkOrders.
Refactored PolicyEnforcerBase::decrementNumQueuedWorkOrders.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/76e37cab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/76e37cab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/76e37cab
Branch: refs/heads/exact-filter
Commit: 76e37cabcf24ff4fec7e50d5bd2b590fb9fe93d2
Parents: 37a78cb
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Nov 18 12:20:12 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 20 19:29:11 2016 -0800
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 1 +
query_execution/PolicyEnforcerBase.cpp | 4 ++--
query_execution/PolicyEnforcerBase.hpp | 6 ++++--
query_execution/PolicyEnforcerDistributed.hpp | 5 +++--
query_execution/PolicyEnforcerSingleNode.cpp | 4 ----
query_execution/PolicyEnforcerSingleNode.hpp | 7 +++++--
query_execution/QueryExecutionMessages.proto | 3 +++
query_execution/QueryExecutionTypedefs.hpp | 3 ++-
query_execution/Shiftboss.cpp | 8 +++++++-
query_execution/Shiftboss.hpp | 1 +
query_execution/Worker.cpp | 14 ++++++++++++++
query_execution/Worker.hpp | 9 +++++++++
12 files changed, 51 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b5e07df..0f6c282 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -160,6 +160,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
glog
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_PolicyEnforcerBase
+ quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionState
quickstep_queryexecution_QueryManagerBase
quickstep_queryexecution_QueryManagerSingleNode
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 4e8c782..6e922a8 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -50,7 +50,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
// WorkOrder. It can be accessed in this scope.
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
- decrementNumQueuedWorkOrders(proto.worker_thread_index());
+ decrementNumQueuedWorkOrders(proto);
if (profile_individual_workorders_) {
recordTimeForWorkOrder(proto);
@@ -69,7 +69,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
// rebuild WorkOrder. It can be accessed in this scope.
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
- decrementNumQueuedWorkOrders(proto.worker_thread_index());
+ decrementNumQueuedWorkOrders(proto);
query_id = proto.query_id();
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 7009a0a..414367e 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -196,9 +196,11 @@ class PolicyEnforcerBase {
/**
* @brief Decrement the number of queued workorders for the given worker by 1.
*
- * @param worker_index The logical ID of the given worker.
+ * @param proto The completion message proto received after the WorkOrder
+ * execution.
**/
- virtual void decrementNumQueuedWorkOrders(const std::size_t worker_index) = 0;
+ virtual void decrementNumQueuedWorkOrders(
+ const serialization::WorkOrderCompletionMessage &proto) = 0;
DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerBase);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index bce3e0c..0bf249c 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -20,6 +20,7 @@
#include <vector>
#include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/ShiftbossDirectory.hpp"
#include "utility/Macros.hpp"
@@ -89,8 +90,8 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
private:
- void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override {
- shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index);
+ void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override {
+ shiftboss_directory_->decrementNumQueuedWorkOrders(proto.shiftboss_index());
}
void onQueryCompletion(QueryManagerBase *query_manager) override;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.cpp b/query_execution/PolicyEnforcerSingleNode.cpp
index 0bdb9b1..0aa2ca8 100644
--- a/query_execution/PolicyEnforcerSingleNode.cpp
+++ b/query_execution/PolicyEnforcerSingleNode.cpp
@@ -108,8 +108,4 @@ bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) {
}
}
-void PolicyEnforcerSingleNode::decrementNumQueuedWorkOrders(const std::size_t worker_index) {
- worker_directory_->decrementNumQueuedWorkOrders(worker_index);
-}
-
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/PolicyEnforcerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerSingleNode.hpp b/query_execution/PolicyEnforcerSingleNode.hpp
index 870df95..16c7a0d 100644
--- a/query_execution/PolicyEnforcerSingleNode.hpp
+++ b/query_execution/PolicyEnforcerSingleNode.hpp
@@ -25,6 +25,8 @@
#include <vector>
#include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/WorkerDirectory.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
@@ -36,7 +38,6 @@ namespace quickstep {
class CatalogDatabaseLite;
class QueryHandle;
class StorageManager;
-class WorkerDirectory;
class WorkerMessage;
/** \addtogroup QueryExecution
@@ -89,7 +90,9 @@ class PolicyEnforcerSingleNode final : public PolicyEnforcerBase {
std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
private:
- void decrementNumQueuedWorkOrders(const std::size_t worker_index) override;
+ void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override {
+ worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index());
+ }
const tmb::client_id foreman_client_id_;
const std::size_t num_numa_nodes_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 165a194..e6d741a 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -46,6 +46,9 @@ message WorkOrderCompletionMessage {
// Epoch time in microseconds.
optional uint64 execution_start_time = 5;
optional uint64 execution_end_time = 6;
+
+ // Required in the distributed version.
+ optional uint64 shiftboss_index = 7;
}
message CatalogRelationNewBlockMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 22c0ae1..fb9a9d6 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -79,7 +79,8 @@ enum QueryExecutionMessageType : message_type_id {
#ifdef QUICKSTEP_DISTRIBUTED
kShiftbossRegistrationMessage, // From Shiftboss to Foreman.
- kShiftbossRegistrationResponseMessage, // From Foreman to Shiftboss.
+ kShiftbossRegistrationResponseMessage, // From Foreman to Shiftboss, or from
+ // Shiftboss to Worker.
kQueryInitiateMessage, // From Foreman to Shiftboss.
kQueryInitiateResponseMessage, // From Shiftboss to Foreman.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 30b2ae2..ed4bade 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -273,7 +273,7 @@ void Shiftboss::registerWithForeman() {
}
void Shiftboss::processShiftbossRegistrationResponseMessage() {
- const AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+ AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
const TaggedMessage &tagged_message = annotated_message.tagged_message;
DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
@@ -286,6 +286,12 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
shiftboss_index_ = proto.shiftboss_index();
+
+ // Forward this message to Workers regarding <shiftboss_index_>.
+ QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
+ worker_addresses_,
+ move(annotated_message.tagged_message),
+ bus_);
}
void Shiftboss::processQueryInitiateMessage(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 442e61e..6538d48 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -102,6 +102,7 @@ class Shiftboss : public Thread {
bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
// Message sent to Worker.
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
// Forward the following message types from Foreman to Workers.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 0db17b4..31eb964 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -29,6 +29,7 @@
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "relational_operators/WorkOrder.hpp"
#include "threading/ThreadIDBasedMap.hpp"
#include "threading/ThreadUtil.hpp"
@@ -62,6 +63,15 @@ void Worker::run() {
bus_->Receive(worker_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
switch (tagged_message.message_type()) {
+#ifdef QUICKSTEP_DISTRIBUTED
+ case kShiftbossRegistrationResponseMessage: {
+ serialization::ShiftbossRegistrationResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ shiftboss_index_ = proto.shiftboss_index();
+ break;
+ }
+#endif // QUICKSTEP_DISTRIBUTED
case kWorkOrderMessage: {
WorkOrderCompletionMessage proto;
executeWorkOrderHelper(tagged_message, &proto);
@@ -136,6 +146,10 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
proto->set_worker_thread_index(worker_thread_index_);
proto->set_execution_start_time(execution_start_time);
proto->set_execution_end_time(execution_end_time);
+
+#ifdef QUICKSTEP_DISTRIBUTED
+ proto->set_shiftboss_index(shiftboss_index_);
+#endif // QUICKSTEP_DISTRIBUTED
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/76e37cab/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index fec2242..feb7c7d 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -24,6 +24,7 @@
#include <cstdint>
#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "threading/Thread.hpp"
#include "utility/Macros.hpp"
@@ -75,6 +76,10 @@ class Worker : public Thread {
bus_->RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);
bus_->RegisterClientAsReceiver(worker_client_id_, kRebuildWorkOrderMessage);
bus_->RegisterClientAsReceiver(worker_client_id_, kPoisonMessage);
+
+#ifdef QUICKSTEP_DISTRIBUTED
+ bus_->RegisterClientAsReceiver(worker_client_id_, kShiftbossRegistrationResponseMessage);
+#endif // QUICKSTEP_DISTRIBUTED
}
~Worker() override {}
@@ -132,6 +137,10 @@ class Worker : public Thread {
const int cpu_id_;
client_id worker_client_id_;
+#ifdef QUICKSTEP_DISTRIBUTED
+ std::size_t shiftboss_index_;
+#endif // QUICKSTEP_DISTRIBUTED
+
DISALLOW_COPY_AND_ASSIGN(Worker);
};