You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/08/01 14:38:57 UTC
[4/6] incubator-quickstep git commit: Renamed a QueryExecutionMessage.
Renamed a QueryExecutionMessage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/aaecc76b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/aaecc76b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/aaecc76b
Branch: refs/heads/SQL-window-aggregation
Commit: aaecc76b7bea85f46bf06dc2e63fccf43636d7eb
Parents: 33554c3
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Thu Jul 28 11:11:33 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Thu Jul 28 11:11:33 2016 -0700
----------------------------------------------------------------------
query_execution/QueryExecutionMessages.proto | 4 +-
query_execution/QueryExecutionTypedefs.hpp | 4 +-
query_execution/Shiftboss.cpp | 45 +++++++++++------------
query_execution/Shiftboss.hpp | 4 +-
4 files changed, 28 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aaecc76b/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 591ca6c..308d736 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -113,12 +113,12 @@ message InitiateRebuildResponseMessage {
required uint64 num_rebuild_work_orders = 3;
}
-message QueryResultRelationMessage {
+message SaveQueryResultMessage {
required int32 relation_id = 1;
repeated fixed64 blocks = 2 [packed=true];
}
-message QueryResultRelationResponseMessage {
+message SaveQueryResultResponseMessage {
required int32 relation_id = 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aaecc76b/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index d73d4ee..b67209f 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,8 +81,8 @@ enum QueryExecutionMessageType : message_type_id {
kInitiateRebuildMessage, // From Foreman to Shiftboss.
kInitiateRebuildResponseMessage, // From Shiftboss to Foreman.
- kQueryResultRelationMessage, // From Foreman to Shiftboss.
- kQueryResultRelationResponseMessage, // From Shiftboss to Foreman.
+ kSaveQueryResultMessage, // From Foreman to Shiftboss.
+ kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aaecc76b/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index af56306..7f655c6 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -149,11 +149,10 @@ void Shiftboss::run() {
move(annotated_message.tagged_message));
break;
}
- case kQueryResultRelationMessage: {
- // TODO(zuyu): Rename to kSaveQueryResultMessage.
+ case kSaveQueryResultMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
- serialization::QueryResultRelationMessage proto;
+ serialization::SaveQueryResultMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
for (int i = 0; i < proto.blocks_size(); ++i) {
@@ -168,25 +167,25 @@ void Shiftboss::run() {
}
}
- serialization::QueryResultRelationResponseMessage ack_proto;
- ack_proto.set_relation_id(proto.relation_id());
+ serialization::SaveQueryResultResponseMessage proto_response;
+ proto_response.set_relation_id(proto.relation_id());
- const size_t ack_proto_length = ack_proto.ByteSize();
- char *ack_proto_bytes = static_cast<char*>(malloc(ack_proto_length));
- CHECK(ack_proto.SerializeToArray(ack_proto_bytes, ack_proto_length));
+ const size_t proto_response_length = proto_response.ByteSize();
+ char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
+ CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
- TaggedMessage ack_message(static_cast<const void*>(ack_proto_bytes),
- ack_proto_length,
- kQueryResultRelationResponseMessage);
- free(ack_proto_bytes);
+ TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
+ proto_response_length,
+ kSaveQueryResultResponseMessage);
+ free(proto_response_bytes);
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') sent QueryResultRelationResponseMessage (typed '" << kQueryResultRelationResponseMessage
- << ") to Foreman";
+ << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
+ << "') to Foreman";
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
- move(ack_message));
+ move(message_response));
break;
}
case kPoisonMessage: {
@@ -280,15 +279,15 @@ void Shiftboss::processQueryInitiateMessage(
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
- proto_length,
- kQueryInitiateResponseMessage);
+ TaggedMessage message_response(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryInitiateResponseMessage);
free(proto_bytes);
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
- move(ack_message));
+ move(message_response));
}
void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -317,15 +316,15 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
- proto_length,
- kInitiateRebuildResponseMessage);
+ TaggedMessage message_response(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kInitiateRebuildResponseMessage);
free(proto_bytes);
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
- move(ack_message));
+ move(message_response));
for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
// NOTE(zuyu): Worker releases the memory after the execution of
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aaecc76b/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 096ab74..9e24d62 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -112,8 +112,8 @@ class Shiftboss : public Thread {
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryResultRelationMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryResultRelationResponseMessage);
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
// Stop itself.
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);