You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/05 07:56:57 UTC
incubator-quickstep git commit: Removed the unnecessary messages for
saving query results in the distributed version.
Repository: incubator-quickstep
Updated Branches:
refs/heads/dist-save-query-result [created] 8643add3f
Removed the unnecessary messages for saving query results in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8643add3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8643add3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8643add3
Branch: refs/heads/dist-save-query-result
Commit: 8643add3fc22a85e3db02b80a4d290bd9b5b323e
Parents: 2e7f446
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Mar 4 23:56:48 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sat Mar 4 23:56:48 2017 -0800
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 43 ----------
query_execution/ForemanDistributed.hpp | 6 --
query_execution/PolicyEnforcerDistributed.cpp | 95 ++++++++--------------
query_execution/QueryExecutionMessages.proto | 15 ----
query_execution/QueryExecutionTypedefs.hpp | 3 -
query_execution/Shiftboss.cpp | 42 ----------
6 files changed, 35 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 3d47fb6..b59edb5 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -86,7 +86,6 @@ ForemanDistributed::ForemanDistributed(
kWorkOrderMessage,
kInitiateRebuildMessage,
kQueryTeardownMessage,
- kSaveQueryResultMessage,
kQueryExecutionSuccessMessage,
kCommandResponseMessage,
kPoisonMessage};
@@ -106,7 +105,6 @@ ForemanDistributed::ForemanDistributed(
kWorkOrderCompleteMessage,
kRebuildWorkOrderCompleteMessage,
kWorkOrderFeedbackMessage,
- kSaveQueryResultResponseMessage,
kPoisonMessage};
for (const auto message_type : receiver_message_types) {
@@ -203,20 +201,6 @@ void ForemanDistributed::run() {
processInitiateRebuildResponseMessage(tagged_message);
break;
}
- case kSaveQueryResultResponseMessage: {
- S::SaveQueryResultResponseMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- const std::size_t query_id = proto.query_id();
- query_result_saved_shiftbosses_[query_id].insert(proto.shiftboss_index());
-
- // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
- if (query_result_saved_shiftbosses_[query_id].size() == shiftboss_directory_.size()) {
- processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
- query_result_saved_shiftbosses_.erase(query_id);
- }
- break;
- }
case kPoisonMessage: {
if (policy_enforcer_->hasQueries()) {
LOG(WARNING) << "ForemanDistributed thread exiting while some queries are "
@@ -466,31 +450,4 @@ void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shi
CHECK(send_status == MessageBus::SendStatus::kOK);
}
-void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id,
- const relation_id result_relation_id) {
- S::QueryExecutionSuccessMessage proto;
- proto.mutable_result_relation()->MergeFrom(
- static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto());
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kQueryExecutionSuccessMessage);
- free(proto_bytes);
-
- // Notify the CLI regarding the query result.
- DLOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '"
- << kQueryExecutionSuccessMessage
- << "') to CLI with TMB client id " << cli_id;
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- cli_id,
- move(message));
- CHECK(send_status == MessageBus::SendStatus::kOK);
-}
-
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 4053b9d..2ce9c5d 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -115,9 +115,6 @@ class ForemanDistributed final : public ForemanBase {
void processShiftbossRegistrationMessage(const tmb::client_id shiftboss_client_id,
const std::size_t work_order_capacity);
- void processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
- const relation_id result_relation_id);
-
/**
* @brief Check if we can collect new messages from the PolicyEnforcer.
*
@@ -136,9 +133,6 @@ class ForemanDistributed final : public ForemanBase {
DataExchangerAsync data_exchanger_;
std::unique_ptr<StorageManager> storage_manager_;
- // From a query id to a set of Shiftbosses that save query result.
- std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
-
DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 424452e..e9faf8c 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -64,6 +64,7 @@ using std::string;
using std::unique_ptr;
using std::vector;
+using tmb::MessageBus;
using tmb::TaggedMessage;
namespace quickstep {
@@ -269,83 +270,57 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
}
- if (query_result_relation == nullptr) {
- if (query_processor_) {
- query_processor_->saveCatalog();
- }
-
- // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
- serialization::QueryTeardownMessage proto;
- proto.set_query_id(query_id);
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+ if (query_result_relation) {
+ const QueryHandle::AnalyzeQueryInfo *analyze_query_info = query_handle->analyze_query_info();
+ if (analyze_query_info) {
+ processAnalyzeQueryResult(cli_id, query_result_relation, analyze_query_info);
+ } else {
+ S::QueryExecutionSuccessMessage proto;
+ proto.mutable_result_relation()->MergeFrom(query_result_relation->getProto());
- TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
- free(proto_bytes);
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
- << "') to all Shiftbosses";
- QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryExecutionSuccessMessage);
+ free(proto_bytes);
- TaggedMessage cli_message(kQueryExecutionSuccessMessage);
+ // Notify the CLI regarding the query result.
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
+ << kQueryExecutionSuccessMessage
+ << "') to CLI with TMB client id " << cli_id;
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+ }
+ } else {
+ if (query_processor_) {
+ query_processor_->saveCatalog();
+ }
// Notify the CLI query execution successfully.
DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
<< kQueryExecutionSuccessMessage
<< "') to CLI with TMB client id " << cli_id;
- const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- cli_id,
- move(cli_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
- return;
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id,
+ TaggedMessage(kQueryExecutionSuccessMessage));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
}
- const QueryHandle::AnalyzeQueryInfo *analyze_query_info = query_handle->analyze_query_info();
- if (analyze_query_info) {
- processAnalyzeQueryResult(cli_id, query_result_relation, analyze_query_info);
-
- // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
- S::QueryTeardownMessage proto;
- proto.set_query_id(query_id);
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
- << "') to all Shiftbosses";
- QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
- return;
- }
-
- // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
- S::SaveQueryResultMessage proto;
+ // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
+ S::QueryTeardownMessage proto;
proto.set_query_id(query_id);
- proto.set_relation_id(query_result_relation->getID());
-
- const vector<block_id> blocks(query_result_relation->getBlocksSnapshot());
- for (const block_id block : blocks) {
- proto.add_blocks(block);
- }
-
- proto.set_cli_id(cli_id);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kSaveQueryResultMessage);
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
free(proto_bytes);
- // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
- DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
<< "') to all Shiftbosses";
QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
}
@@ -439,9 +414,9 @@ void PolicyEnforcerDistributed::processAnalyzeQueryResult(const tmb::client_id c
DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage (typed '" << kCommandResponseMessage
<< "') to CLI with TMB client id " << cli_id;
- const tmb::MessageBus::SendStatus send_status =
+ const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+ CHECK(send_status == MessageBus::SendStatus::kOK);
completed_analyze_relations_.erase(cli_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index a45e8df..dd3c9a7 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -119,21 +119,6 @@ message QueryTeardownMessage {
required uint64 query_id = 1;
}
-message SaveQueryResultMessage {
- required uint64 query_id = 1;
- required int32 relation_id = 2;
- repeated fixed64 blocks = 3 [packed=true];
-
- required uint32 cli_id = 4; // tmb::client_id.
-}
-
-message SaveQueryResultResponseMessage {
- required uint64 query_id = 1;
- required int32 relation_id = 2;
- required uint32 cli_id = 3; // tmb::client_id.
- required uint64 shiftboss_index = 4;
-}
-
message CommandResponseMessage {
required string command_response = 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 081852f..afdac92 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -100,9 +100,6 @@ enum QueryExecutionMessageType : message_type_id {
kQueryTeardownMessage, // From Foreman to Shiftboss.
- kSaveQueryResultMessage, // From Foreman to Shiftboss.
- kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
-
kQueryExecutionSuccessMessage, // From Foreman to CLI.
// From Foreman / Conductor to CLI.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 2f7dc3c..fa922f0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -107,9 +107,6 @@ Shiftboss::Shiftboss(tmb::MessageBus *bus_global,
bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kInitiateRebuildMessage);
bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kInitiateRebuildResponseMessage);
- bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kSaveQueryResultMessage);
- bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kSaveQueryResultResponseMessage);
-
// Message sent to Worker.
bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kShiftbossRegistrationResponseMessage);
bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kRebuildWorkOrderMessage);
@@ -230,45 +227,6 @@ void Shiftboss::run() {
query_contexts_.erase(proto.query_id());
break;
}
- case kSaveQueryResultMessage: {
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
- serialization::SaveQueryResultMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- for (int i = 0; i < proto.blocks_size(); ++i) {
- storage_manager_->saveBlockOrBlob(proto.blocks(i));
- }
-
- // Clean up query execution states, i.e., QueryContext.
- query_contexts_.erase(proto.query_id());
-
- serialization::SaveQueryResultResponseMessage proto_response;
- proto_response.set_query_id(proto.query_id());
- proto_response.set_relation_id(proto.relation_id());
- proto_response.set_cli_id(proto.cli_id());
- proto_response.set_shiftboss_index(shiftboss_index_);
-
- const size_t proto_response_length = proto_response.ByteSize();
- char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
- CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
-
- TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
- proto_response_length,
- kSaveQueryResultResponseMessage);
- free(proto_response_bytes);
-
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
- << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
- << "') to Foreman with TMB client ID " << foreman_client_id_;
- const MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(bus_global_,
- shiftboss_client_id_global_,
- foreman_client_id_,
- move(message_response));
- CHECK(send_status == MessageBus::SendStatus::kOK);
- break;
- }
case kPoisonMessage: {
DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
<< "') forwarded PoisonMessage (typed '" << kPoisonMessage