You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/04 19:03:55 UTC
[2/4] incubator-quickstep git commit: Clean up query execution states
in Shiftboss.
Clean up query execution states in Shiftboss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9e158490
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9e158490
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9e158490
Branch: refs/heads/new-distributed-exe-test
Commit: 9e1584901ee3cd3658c426d8c470ed5aabe046d5
Parents: 220fa06
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Sat Jul 30 10:59:31 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Sat Jul 30 10:59:31 2016 -0700
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 1 +
query_execution/PolicyEnforcerDistributed.cpp | 42 +++++++++++++++++-----
query_execution/QueryExecutionMessages.proto | 4 +++
query_execution/QueryExecutionTypedefs.hpp | 2 ++
query_execution/Shiftboss.cpp | 9 +++++
query_execution/Shiftboss.hpp | 3 ++
6 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 1c0fba8..2632113 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -67,6 +67,7 @@ ForemanDistributed::ForemanDistributed(
kQueryInitiateMessage,
kWorkOrderMessage,
kInitiateRebuildMessage,
+ kQueryTeardownMessage,
kSaveQueryResultMessage,
kQueryExecutionSuccessMessage,
kPoisonMessage};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 59df3de..087e0b3 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -196,29 +196,53 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) {
const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+ const tmb::client_id cli_id = query_handle->getClientId();
+ const std::size_t query_id = query_handle->query_id();
+
if (query_result == nullptr) {
- // TODO(zuyu): notify Shiftboss to remove QueryContext.
- TaggedMessage message(kQueryExecutionSuccessMessage);
+ // Clean up query execution states, i.e., QueryContext, in Shiftboss.
+ serialization::QueryTeardownMessage proto;
+ proto.set_query_id(query_id);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryTeardownMessage);
+
+ // TODO(zuyu): Support multiple shiftbosses.
+ LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+ << "') to Shiftboss 0";
+ tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_directory_->getClientId(0),
+ move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to Shiftboss";
- const tmb::client_id cli_id = query_handle->getClientId();
+ TaggedMessage cli_message(kQueryExecutionSuccessMessage);
- // Notify the CLI regarding the query execution result.
+ // Notify the CLI query execution successfully.
LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
<< "') to CLI with TMB client id " << cli_id;
- const tmb::MessageBus::SendStatus send_status =
+ send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
cli_id,
- move(message));
+ move(cli_message));
CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
<< "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
<< " to CLI with TMB client ID " << cli_id;
return;
}
- // SaveQueryResultMessage implies QueryContext clean up in Shiftboss.
+ // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
S::SaveQueryResultMessage proto;
- proto.set_query_id(query_handle->query_id());
+ proto.set_query_id(query_id);
proto.set_relation_id(query_result->getID());
const vector<block_id> blocks(query_result->getBlocksSnapshot());
@@ -226,7 +250,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) {
proto.add_blocks(block);
}
- proto.set_cli_id(query_handle->getClientId());
+ proto.set_cli_id(cli_id);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 99de75c..b182b87 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -114,6 +114,10 @@ message InitiateRebuildResponseMessage {
required uint64 shiftboss_index = 4;
}
+message QueryTeardownMessage {
+ required uint64 query_id = 1;
+}
+
message SaveQueryResultMessage {
required uint64 query_id = 1;
required int32 relation_id = 2;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 0d43237..5706c82 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,6 +81,8 @@ enum QueryExecutionMessageType : message_type_id {
kInitiateRebuildMessage, // From Foreman to Shiftboss.
kInitiateRebuildResponseMessage, // From Shiftboss to Foreman.
+ kQueryTeardownMessage, // From Foreman to Shiftboss.
+
kSaveQueryResultMessage, // From Foreman to Shiftboss.
kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 925dc1f..978fe34 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -157,6 +157,15 @@ void Shiftboss::run() {
<< " to Foreman with TMB client ID " << foreman_client_id_;
break;
}
+ case kQueryTeardownMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::QueryTeardownMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ query_contexts_.erase(proto.query_id());
+ break;
+ }
case kSaveQueryResultMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 9e24d62..3943938 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -115,6 +115,9 @@ class Shiftboss : public Thread {
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+ // Clean up query execution states, i.e., QueryContext.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
+
// Stop itself.
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
// Stop all workers.