You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/08 17:36:31 UTC
[1/2] incubator-quickstep git commit: Minor updates to Shiftboss.
[Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/dist-policy-enforcer dd5f696d5 -> 3c2749eaf (forced update)
Minor updates to Shiftboss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e443b2b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e443b2b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e443b2b8
Branch: refs/heads/dist-policy-enforcer
Commit: e443b2b8409a128cc5ba2bdf1a6d01ebf79e7e74
Parents: bd01748
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 10:33:10 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 8 10:33:10 2016 -0700
----------------------------------------------------------------------
query_execution/QueryExecutionMessages.proto | 1 +
query_execution/Shiftboss.cpp | 89 ++++++++++++++++-------
query_execution/Shiftboss.hpp | 3 +
3 files changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e443b2b8/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index f6a8b73..f680d35 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -87,6 +87,7 @@ message ShiftbossRegistrationMessage {
}
message ShiftbossRegistrationResponseMessage {
+ required uint64 shiftboss_index = 1;
}
message QueryInitiateMessage {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e443b2b8/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 120e8fb..24c91fe 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -56,6 +56,7 @@ using std::string;
using std::unique_ptr;
using std::vector;
+using tmb::MessageBus;
using tmb::TaggedMessage;
namespace quickstep {
@@ -78,6 +79,13 @@ void Shiftboss::run() {
switch (annotated_message.tagged_message.message_type()) {
case kShiftbossRegistrationResponseMessage: {
foreman_client_id_ = annotated_message.sender;
+
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::ShiftbossRegistrationResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ shiftboss_index_ = proto.shiftboss_index();
break;
}
case kQueryInitiateMessage: {
@@ -117,10 +125,14 @@ void Shiftboss::run() {
<< "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
<< "') from Foreman to worker " << worker_index;
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- workers_->getClientID(worker_index),
- move(worker_tagged_message));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ workers_->getClientID(worker_index),
+ move(worker_tagged_message));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
break;
}
case kInitiateRebuildMessage: {
@@ -147,10 +159,14 @@ void Shiftboss::run() {
<< "' message from worker (client " << annotated_message.sender << ") to Foreman";
DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(annotated_message.tagged_message));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(annotated_message.tagged_message));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
break;
}
case kSaveQueryResultMessage: {
@@ -178,10 +194,14 @@ void Shiftboss::run() {
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
<< "') to Foreman";
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(message_response));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
break;
}
case kPoisonMessage: {
@@ -192,12 +212,14 @@ void Shiftboss::run() {
tmb::MessageStyle broadcast_style;
broadcast_style.Broadcast(true);
- tmb::MessageBus::SendStatus send_status =
+ const MessageBus::SendStatus send_status =
bus_->Send(shiftboss_client_id_,
worker_addresses_,
broadcast_style,
move(annotated_message.tagged_message));
- DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be broadcast from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to All workers";
return;
}
default: {
@@ -280,10 +302,14 @@ void Shiftboss::processQueryInitiateMessage(
kQueryInitiateResponseMessage);
free(proto_bytes);
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(message_response));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
}
void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -307,8 +333,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
proto.set_query_id(query_id);
proto.set_operator_index(op_index);
proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
- // TODO(zuyu): Multiple Shiftboss support.
- proto.set_shiftboss_index(0);
+ proto.set_shiftboss_index(shiftboss_index_);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -319,10 +344,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kInitiateRebuildResponseMessage);
free(proto_bytes);
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(message_response));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
// NOTE(zuyu): Worker releases the memory after the execution of
@@ -347,10 +376,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
<< "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
<< "') to worker " << worker_index;
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- workers_->getClientID(worker_index),
- move(worker_tagged_message));
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ workers_->getClientID(worker_index),
+ move(worker_tagged_message));
+ CHECK(send_status == MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e443b2b8/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 32d2408..9464a4d 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -223,6 +223,9 @@ class Shiftboss : public Thread {
tmb::client_id shiftboss_client_id_, foreman_client_id_;
+ // Unique per Shiftboss instance.
+ std::uint64_t shiftboss_index_;
+
// TMB recipients for all workers managed by this Shiftboss.
tmb::Address worker_addresses_;
[2/2] incubator-quickstep git commit: Added PolicyEnforcer
implementation for the distributed version.
Posted by zu...@apache.org.
Added PolicyEnforcer implementation for the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3c2749ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3c2749ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3c2749ea
Branch: refs/heads/dist-policy-enforcer
Commit: 3c2749eafcff25283915bdef21822056d36f5281
Parents: e443b2b
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Thu Aug 4 11:45:51 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 8 10:35:45 2016 -0700
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 24 ++
query_execution/PolicyEnforcerBase.cpp | 2 +
query_execution/PolicyEnforcerBase.hpp | 7 +
query_execution/PolicyEnforcerDistributed.cpp | 279 +++++++++++++++++++++
query_execution/PolicyEnforcerDistributed.hpp | 113 +++++++++
query_execution/QueryExecutionMessages.proto | 16 +-
query_execution/QueryExecutionTypedefs.hpp | 5 +
query_execution/QueryManagerBase.cpp | 3 +-
query_execution/QueryManagerBase.hpp | 11 +-
9 files changed, 456 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 4b180e3..74fcafb 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,9 @@ endif()
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
@@ -110,6 +113,26 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
+ glog
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_Catalog_proto
+ quickstep_queryexecution_PolicyEnforcerBase
+ quickstep_queryexecution_QueryContext_proto
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionState
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_QueryManagerBase
+ quickstep_queryexecution_QueryManagerDistributed
+ quickstep_queryexecution_ShiftbossDirectory
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_storage_StorageBlockInfo
+ quickstep_utility_Macros
+ tmb
+ ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
glog
quickstep_catalog_CatalogTypedefs
@@ -293,6 +316,7 @@ target_link_libraries(quickstep_queryexecution
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_PolicyEnforcerDistributed
quickstep_queryexecution_QueryManagerDistributed
quickstep_queryexecution_Shiftboss
quickstep_queryexecution_ShiftbossDirectory)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index bf6edf9..78f7b44 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -136,6 +136,8 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
}
if (admitted_queries_[query_id]->queryStatus(op_index) ==
QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+ onQueryCompletion(admitted_queries_[query_id].get());
+
removeQuery(query_id);
if (!waiting_queries_.empty()) {
// Admit the earliest waiting query.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index c75a531..e95799e 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -140,6 +140,13 @@ class PolicyEnforcerBase {
static constexpr std::size_t kMaxConcurrentQueries = 1;
/**
+ * @brief Add custom actions upon the completion of a query.
+ *
+ * @param query_manager The query manager.
+ **/
+ virtual void onQueryCompletion(QueryManagerBase *query_manager) {}
+
+ /**
* @brief Record the execution time for a finished WorkOrder.
*
* TODO(harshad) - Extend the functionality to rebuild work orders.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
new file mode 100644
index 0000000..6d0de47
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -0,0 +1,279 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerDistributed.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+ " can be allocated in a single round of dispatch of messages to"
+ " the workers.");
+
+void PolicyEnforcerDistributed::getWorkOrderMessages(
+ vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+ // Iterate over admitted queries until either there are no more
+ // messages available, or the maximum number of messages have
+ // been collected.
+ DCHECK(work_order_messages->empty());
+ // TODO(harshad) - Make this function generic enough so that it
+ // works well when multiple queries are getting executed.
+ if (admitted_queries_.empty()) {
+ LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ return;
+ }
+
+ const std::size_t per_query_share =
+ FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+ DCHECK_GT(per_query_share, 0u);
+
+ vector<std::size_t> finished_queries_ids;
+
+ for (const auto &admitted_query_info : admitted_queries_) {
+ QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+ DCHECK(curr_query_manager != nullptr);
+ std::size_t messages_collected_curr_query = 0;
+ while (messages_collected_curr_query < per_query_share) {
+ S::WorkOrderMessage *next_work_order_message =
+ static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
+ if (next_work_order_message != nullptr) {
+ ++messages_collected_curr_query;
+ work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+ } else {
+ // No more work ordes from the current query at this time.
+ // Check if the query's execution is over.
+ if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ // If the query has been executed, remove it.
+ finished_queries_ids.push_back(admitted_query_info.first);
+ }
+ break;
+ }
+ }
+ }
+ for (const std::size_t finished_qid : finished_queries_ids) {
+ onQueryCompletion(admitted_queries_[finished_qid].get());
+ removeQuery(finished_qid);
+ }
+}
+
+bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
+ if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+ // NOTE(zuyu): Should call before constructing a 'QueryManager'.
+ // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
+ // initializes.
+ initiateQueryInShiftboss(query_handle);
+
+ // Query with the same ID not present, ok to admit.
+ admitted_queries_[query_id].reset(
+ new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+ } else {
+ // This query will have to wait.
+ waiting_queries_.push(query_handle);
+ return false;
+ }
+}
+
+void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) {
+ S::InitiateRebuildResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const std::size_t query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+
+ const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
+ query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders);
+ shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders);
+
+ if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ onQueryCompletion(query_manager);
+
+ removeQuery(query_id);
+ if (!waiting_queries_.empty()) {
+ // Admit the earliest waiting query.
+ QueryHandle *new_query = waiting_queries_.front();
+ waiting_queries_.pop();
+ admitQuery(new_query);
+ }
+ }
+}
+
+void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
+ S::QueryInitiateMessage proto;
+ proto.set_query_id(query_handle->query_id());
+ proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto());
+ proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryInitiateMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
+ << "') to Shiftboss 0";
+
+ // TODO(zuyu): Multiple Shiftbosses support.
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_directory_->getClientId(0),
+ move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+
+ // Wait Shiftboss for QueryInitiateResponseMessage.
+ const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
+ LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
+
+ S::QueryInitiateResponseMessage proto_response;
+ CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+}
+
+void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) {
+ const QueryHandle *query_handle = query_manager->query_handle();
+
+ const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+ const tmb::client_id cli_id = query_handle->getClientId();
+ const std::size_t query_id = query_handle->query_id();
+
+ if (query_result == nullptr) {
+ // Clean up query execution states, i.e., QueryContext, in Shiftboss.
+ serialization::QueryTeardownMessage proto;
+ proto.set_query_id(query_id);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryTeardownMessage);
+
+ // TODO(zuyu): Support multiple shiftbosses.
+ LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+ << "') to Shiftboss 0";
+ tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_directory_->getClientId(0),
+ move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to Shiftboss";
+
+ TaggedMessage cli_message(kQueryExecutionSuccessMessage);
+
+ // Notify the CLI query execution successfully.
+ LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
+ << "') to CLI with TMB client id " << cli_id;
+ send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ cli_id,
+ move(cli_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to CLI with TMB client ID " << cli_id;
+ return;
+ }
+
+ // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
+ S::SaveQueryResultMessage proto;
+ proto.set_query_id(query_id);
+ proto.set_relation_id(query_result->getID());
+
+ const vector<block_id> blocks(query_result->getBlocksSnapshot());
+ for (const block_id block : blocks) {
+ proto.add_blocks(block);
+ }
+
+ proto.set_cli_id(cli_id);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kSaveQueryResultMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+ << "') to Shiftboss 0";
+ // TODO(zuyu): Support multiple shiftbosses.
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_directory_->getClientId(0),
+ move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
new file mode 100644
index 0000000..16ebe36
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -0,0 +1,113 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb {
+class MessageBus;
+class TaggedMessage;
+}
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class QueryManagerBase;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ * in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param foreman_client_id The TMB client ID of the Foreman.
+ * @param catalog_database The CatalogDatabase used.
+ * @param bus The TMB.
+ **/
+ PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
+ CatalogDatabaseLite *catalog_database,
+ ShiftbossDirectory *shiftboss_directory,
+ tmb::MessageBus *bus,
+ const bool profile_individual_workorders = false)
+ : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+ foreman_client_id_(foreman_client_id),
+ shiftboss_directory_(shiftboss_directory),
+ bus_(bus) {}
+
+ /**
+ * @brief Destructor.
+ **/
+ ~PolicyEnforcerDistributed() override {}
+
+ bool admitQuery(QueryHandle *query_handle) override;
+
+ /**
+ * @brief Get work order messages to be dispatched. These messages come from
+ * the active queries.
+ *
+ * @param work_order_messages The work order messages to be dispatched.
+ **/
+ void getWorkOrderMessages(
+ std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+
+ /**
+ * @brief Process the initiate rebuild work order response message.
+ *
+ * @param tagged_message The message.
+ **/
+ void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
+
+ private:
+ void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override {
+ shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index);
+ }
+
+ void onQueryCompletion(QueryManagerBase *query_manager) override;
+
+ void initiateQueryInShiftboss(QueryHandle *query_handle);
+
+ const tmb::client_id foreman_client_id_;
+
+ ShiftbossDirectory *shiftboss_directory_;
+
+ tmb::MessageBus *bus_;
+
+ DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index f680d35..20b684e 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -120,13 +120,25 @@ message InitiateRebuildResponseMessage {
required uint64 shiftboss_index = 4;
}
+message QueryTeardownMessage {
+ required uint64 query_id = 1;
+}
+
message SaveQueryResultMessage {
- required int32 relation_id = 1;
- repeated fixed64 blocks = 2 [packed=true];
+ required uint64 query_id = 1;
+ required int32 relation_id = 2;
+ repeated fixed64 blocks = 3 [packed=true];
+
+ required uint32 cli_id = 4; // tmb::client_id.
}
message SaveQueryResultResponseMessage {
required int32 relation_id = 1;
+ required uint32 cli_id = 2; // tmb::client_id.
+}
+
+message QueryExecutionSuccessMessage {
+ optional CatalogRelationSchema result_relation = 1;
}
// BlockLocator related messages.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 4643096..d154d84 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -86,9 +86,14 @@ enum QueryExecutionMessageType : message_type_id {
kInitiateRebuildMessage, // From Foreman to Shiftboss.
kInitiateRebuildResponseMessage, // From Shiftboss to Foreman.
+ kQueryTeardownMessage, // From Foreman to Shiftboss.
+
kSaveQueryResultMessage, // From Foreman to Shiftboss.
kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
+ // From Foreman to CLI.
+ kQueryExecutionSuccessMessage,
+
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index c60e323..8e37da8 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -37,7 +37,8 @@ using std::pair;
namespace quickstep {
QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
- : query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
+ : query_handle_(DCHECK_NOTNULL(query_handle)),
+ query_id_(query_handle->query_id()),
query_dag_(DCHECK_NOTNULL(
DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
num_operators_in_dag_(query_dag_->size()),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 782b8ed..a274742 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -74,6 +74,13 @@ class QueryManagerBase {
virtual ~QueryManagerBase() {}
/**
+ * @brief Get the query handle.
+ **/
+ const QueryHandle* query_handle() const {
+ return query_handle_;
+ }
+
+ /**
* @brief Get the QueryExecutionState for this query.
**/
inline const QueryExecutionState& getQueryExecutionState() const {
@@ -252,9 +259,11 @@ class QueryManagerBase {
return query_exec_state_->hasRebuildInitiated(index);
}
+ const QueryHandle *query_handle_;
+
const std::size_t query_id_;
- DAG<RelationalOperator, bool> *query_dag_;
+ DAG<RelationalOperator, bool> *query_dag_; // Owned by 'query_handle_'.
const dag_node_index num_operators_in_dag_;
// For all nodes, store their receiving dependents.