You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/06 20:18:26 UTC
[29/38] incubator-quickstep git commit: Refactored the debug logs in
the query execution.
Refactored the debug logs in the query execution.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/686bbb58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/686bbb58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/686bbb58
Branch: refs/heads/reorder-partitioned-hash-join
Commit: 686bbb587035b4980503373461c7df4466115023
Parents: 8643add
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Mar 5 01:54:45 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Mar 5 02:03:57 2017 -0800
----------------------------------------------------------------------
cli/distributed/Cli.cpp | 22 ++++----
cli/distributed/Conductor.cpp | 21 ++++----
cli/distributed/QuickstepDistributedCli.cpp | 2 +
.../DistributedCommandExecutorTestRunner.cpp | 11 +---
query_execution/BlockLocator.cpp | 24 ++++-----
query_execution/BlockLocatorUtil.cpp | 11 ++--
query_execution/ForemanDistributed.cpp | 15 +++---
query_execution/ForemanSingleNode.cpp | 7 ++-
query_execution/PolicyEnforcerDistributed.cpp | 17 ++----
query_execution/QueryExecutionUtil.hpp | 48 ++++++++++++++++-
query_execution/QueryManagerDistributed.cpp | 3 +-
query_execution/Shiftboss.cpp | 56 +++++++++-----------
query_execution/Worker.cpp | 7 +--
query_execution/tests/BlockLocator_unittest.cpp | 9 ++--
relational_operators/DeleteOperator.cpp | 3 +-
relational_operators/RebuildWorkOrder.hpp | 3 +-
relational_operators/UpdateOperator.cpp | 3 +-
relational_operators/WorkOrder.hpp | 3 +-
storage/InsertDestination.cpp | 15 +++---
storage/InsertDestination.hpp | 3 +-
storage/StorageManager.cpp | 21 ++++----
storage/tests/DataExchange_unittest.cpp | 5 +-
22 files changed, 151 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 14880a7..9f48ecc 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -88,9 +88,7 @@ void Cli::init() {
bus_.RegisterClientAsSender(cli_id_, kDistributedCliRegistrationMessage);
bus_.RegisterClientAsReceiver(cli_id_, kDistributedCliRegistrationResponseMessage);
- DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage (typed '"
- << kDistributedCliRegistrationMessage
- << "') to all";
+ DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage to all";
tmb::Address all_addresses;
all_addresses.All(true);
@@ -103,12 +101,12 @@ void Cli::init() {
// Wait for Conductor to response.
const AnnotatedMessage cli_reg_response_message(bus_.Receive(cli_id_, 0, true));
- CHECK_EQ(kDistributedCliRegistrationResponseMessage,
- cli_reg_response_message.tagged_message.message_type());
+ DCHECK_EQ(kDistributedCliRegistrationResponseMessage,
+ cli_reg_response_message.tagged_message.message_type());
conductor_client_id_ = cli_reg_response_message.sender;
- DLOG(INFO) << "DistributedCli received typed '" << kDistributedCliRegistrationResponseMessage
- << "' message from Conductor (id'" << conductor_client_id_ << "').";
+ DLOG(INFO) << "DistributedCli received DistributedCliRegistrationResponseMessage from Conductor with Client "
+ << conductor_client_id_;
// Setup StorageManager.
bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
@@ -179,8 +177,7 @@ void Cli::run() {
}
}
- DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
- << "') to Conductor";
+ DLOG(INFO) << "DistributedCli sent SqlQueryMessage to Conductor";
S::SqlQueryMessage proto;
proto.set_sql_query(*command_string);
@@ -197,9 +194,10 @@ void Cli::run() {
const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
const TaggedMessage &tagged_message = annotated_message.tagged_message;
- DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type()
- << "' message from client " << annotated_message.sender;
- switch (tagged_message.message_type()) {
+ const tmb::message_type_id message_type = tagged_message.message_type();
+ DLOG(INFO) << "DistributedCli received " << QueryExecutionUtil::MessageTypeToString(message_type)
+ << " from Client " << annotated_message.sender;
+ switch (message_type) {
case kCommandResponseMessage: {
S::CommandResponseMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 5fb4453..ef253f1 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -124,17 +124,16 @@ void Conductor::run() {
for (;;) {
AnnotatedMessage annotated_message(bus_.Receive(conductor_client_id_, 0, true));
const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const tmb::message_type_id message_type = tagged_message.message_type();
const client_id sender = annotated_message.sender;
-
- DLOG(INFO) << "Conductor received typed '" << tagged_message.message_type()
- << "' message from client " << sender;
- switch (tagged_message.message_type()) {
+ DLOG(INFO) << "Conductor received " << QueryExecutionUtil::MessageTypeToString(message_type)
+ << " from Client " << sender;
+ switch (message_type) {
case kDistributedCliRegistrationMessage: {
TaggedMessage message(kDistributedCliRegistrationResponseMessage);
- DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '"
- << kDistributedCliRegistrationResponseMessage
- << "') to Distributed CLI " << sender;
+ DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage to DistributedCLI with Client "
+ << sender;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
break;
@@ -201,8 +200,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
- << "') to Distributed CLI " << sender;
+ DLOG(INFO) << "Conductor sent CommandResponseMessage to DistributedCLI with Client " << sender;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
} else {
@@ -232,9 +230,8 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
kQueryExecutionErrorMessage);
free(proto_bytes);
- DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '"
- << kQueryExecutionErrorMessage
- << "') to Distributed CLI " << sender;
+ DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage to DistributedCLI with Client "
+ << sender;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/cli/distributed/QuickstepDistributedCli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/QuickstepDistributedCli.cpp b/cli/distributed/QuickstepDistributedCli.cpp
index f01cd13..b7edd83 100644
--- a/cli/distributed/QuickstepDistributedCli.cpp
+++ b/cli/distributed/QuickstepDistributedCli.cpp
@@ -75,6 +75,8 @@ int main(int argc, char *argv[]) {
}
role->init();
+ LOG(INFO) << FLAGS_role << " is ready";
+
role->run();
return 0;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/cli/tests/DistributedCommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTestRunner.cpp b/cli/tests/DistributedCommandExecutorTestRunner.cpp
index 66d0767..0df488e 100644
--- a/cli/tests/DistributedCommandExecutorTestRunner.cpp
+++ b/cli/tests/DistributedCommandExecutorTestRunner.cpp
@@ -20,7 +20,6 @@
#include "cli/tests/DistributedCommandExecutorTestRunner.hpp"
#include <cstdio>
-#include <functional>
#include <memory>
#include <set>
#include <string>
@@ -63,12 +62,6 @@ namespace quickstep {
class CatalogRelation;
-namespace {
-
-void nop() {}
-
-} // namespace
-
namespace C = cli;
const char *DistributedCommandExecutorTestRunner::kResetOption =
@@ -104,8 +97,8 @@ DistributedCommandExecutorTestRunner::DistributedCommandExecutorTestRunner(const
// NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
// could receive a registration message from the latter.
- foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
- test_database_loader_->catalog_database());
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database(),
+ nullptr /* query_processor */);
// We don't use the NUMA aware version of worker code.
const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index fa6db51..765021e 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -55,10 +55,11 @@ void BlockLocator::run() {
// message is received.
const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const tmb::message_type_id message_type = tagged_message.message_type();
const client_id sender = annotated_message.sender;
- DLOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
- << "' message from TMB Client " << sender;
- switch (tagged_message.message_type()) {
+ DLOG(INFO) << "BlockLocator received " << QueryExecutionUtil::MessageTypeToString(message_type)
+ << " from Client " << sender;
+ switch (message_type) {
case kBlockDomainRegistrationMessage: {
serialization::BlockDomainRegistrationMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -190,10 +191,8 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive
kBlockDomainRegistrationResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
- << "') sent BlockDomainRegistrationResponseMessage (typed '"
- << kBlockDomainRegistrationResponseMessage
- << "') to TMB Client (id '" << receiver << "')";
+ DLOG(INFO) << "BlockLocator with Client " << locator_client_id_
+ << " sent BlockDomainRegistrationResponseMessage to Client " << receiver;
CHECK(tmb::MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
locator_client_id_,
@@ -220,9 +219,8 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver,
kLocateBlockResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
- << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
- << "') to StorageManager (id '" << receiver << "')";
+ DLOG(INFO) << "BlockLocator with Client " << locator_client_id_
+ << " sent LocateBlockResponseMessage to StorageManager with Client " << receiver;
CHECK(tmb::MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
locator_client_id_,
@@ -249,10 +247,8 @@ void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id r
kGetPeerDomainNetworkAddressesResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
- << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
- << kGetPeerDomainNetworkAddressesResponseMessage
- << "') to StorageManager (id '" << receiver << "')";
+ DLOG(INFO) << "BlockLocator with Client " << locator_client_id_
+ << " sent GetPeerDomainNetworkAddressesResponseMessage to StorageManager with Client " << receiver;
CHECK(tmb::MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
locator_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/BlockLocatorUtil.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocatorUtil.cpp b/query_execution/BlockLocatorUtil.cpp
index d2d1e96..63a4799 100644
--- a/query_execution/BlockLocatorUtil.cpp
+++ b/query_execution/BlockLocatorUtil.cpp
@@ -65,9 +65,7 @@ block_id_domain getBlockDomain(const std::string &network_address,
kBlockDomainRegistrationMessage);
std::free(proto_bytes);
- DLOG(INFO) << "Client (id '" << cli_id
- << "') broadcasts BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
- << "') to BlockLocator.";
+ DLOG(INFO) << "Client " << cli_id << " broadcasts BlockDomainRegistrationMessage to BlockLocator";
CHECK(MessageBus::SendStatus::kOK ==
bus->Send(cli_id, address, style, std::move(message)));
@@ -78,10 +76,9 @@ block_id_domain getBlockDomain(const std::string &network_address,
*locator_client_id = annotated_message.sender;
- DLOG(INFO) << "Client (id '" << cli_id
- << "') received BlockDomainRegistrationResponseMessage (typed '"
- << kBlockDomainRegistrationResponseMessage
- << "') from BlockLocator (id '" << *locator_client_id << "').";
+ DLOG(INFO) << "Client " << cli_id
+ << " received BlockDomainRegistrationResponseMessage from BlockLocator with Client "
+ << *locator_client_id;
S::BlockDomainMessage response_proto;
CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index b59edb5..3903e8a 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -135,8 +135,7 @@ void ForemanDistributed::run() {
const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_message.tagged_message;
DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
- DLOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
- << "' message from client " << annotated_message.sender;
+ DLOG(INFO) << "ForemanDistributed received ShiftbossRegistrationMessage from Client " << annotated_message.sender;
S::ShiftbossRegistrationMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -152,8 +151,8 @@ void ForemanDistributed::run() {
bus_->Receive(foreman_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_message.tagged_message;
const tmb::message_type_id message_type = tagged_message.message_type();
- DLOG(INFO) << "ForemanDistributed received typed '" << message_type
- << "' message from client " << annotated_message.sender;
+ DLOG(INFO) << "ForemanDistributed received " << QueryExecutionUtil::MessageTypeToString(message_type)
+ << " from Client " << annotated_message.sender;
switch (message_type) {
case kShiftbossRegistrationMessage: {
S::ShiftbossRegistrationMessage proto;
@@ -397,8 +396,7 @@ void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
free(proto_bytes);
const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index);
- DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage
- << "') to Shiftboss with TMB client ID " << shiftboss_client_id;
+ DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage to Shiftboss with Client " << shiftboss_client_id;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
@@ -439,9 +437,8 @@ void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shi
shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity);
- DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '"
- << kShiftbossRegistrationResponseMessage
- << "') to Shiftboss with TMB client id " << shiftboss_client_id;
+ DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage to Shiftboss with Client "
+ << shiftboss_client_id;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 3eac0ff..1501408 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -166,8 +166,7 @@ void ForemanSingleNode::run() {
// Signal the main thread that there are no queries to be executed.
// Currently the message doesn't have any real content.
TaggedMessage completion_tagged_message(kWorkloadCompletionMessage);
- DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage (typed '" << kWorkloadCompletionMessage
- << "') to CLI with TMB client ID " << main_thread_client_id_;
+ DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage to CLI with Client " << main_thread_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_,
@@ -222,8 +221,8 @@ void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
}
TaggedMessage worker_tagged_message(&message, sizeof(message), type);
- DLOG(INFO) << "ForemanSingleNode sent WorkOrderMessage (typed '" << type
- << "') to Worker with TMB client ID " << worker_directory_->getClientID(worker_thread_index);
+ DLOG(INFO) << "ForemanSingleNode sent " << QueryExecutionUtil::MessageTypeToString(type)
+ << " to Worker with Client " << worker_directory_->getClientID(worker_thread_index);
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index e9faf8c..619e73f 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -231,8 +231,7 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
}
- DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
- << "') to all Shiftbosses";
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage to all Shiftbosses";
QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
shiftboss_addresses,
move(message),
@@ -287,9 +286,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
free(proto_bytes);
// Notify the CLI regarding the query result.
- DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
- << kQueryExecutionSuccessMessage
- << "') to CLI with TMB client id " << cli_id;
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage to DistributedCLI with Client " << cli_id;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
CHECK(send_status == MessageBus::SendStatus::kOK);
@@ -300,9 +297,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
}
// Notify the CLI query execution successfully.
- DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
- << kQueryExecutionSuccessMessage
- << "') to CLI with TMB client id " << cli_id;
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage to DistributedCLI with Client " << cli_id;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id,
TaggedMessage(kQueryExecutionSuccessMessage));
@@ -320,8 +315,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
free(proto_bytes);
- DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
- << "') to all Shiftbosses";
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage to all Shiftbosses";
QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
}
@@ -412,8 +406,7 @@ void PolicyEnforcerDistributed::processAnalyzeQueryResult(const tmb::client_id c
TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage (typed '" << kCommandResponseMessage
- << "') to CLI with TMB client id " << cli_id;
+ DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage to DistributedCLI with Client " << cli_id;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
CHECK(send_status == MessageBus::SendStatus::kOK);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index b41965c..3f74af3 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -22,10 +22,12 @@
#include <cstddef>
#include <memory>
+#include <string>
#include <utility>
#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -49,6 +51,49 @@ class QueryHandle;
**/
class QueryExecutionUtil {
public:
+ static std::string MessageTypeToString(const tmb::message_type_id message_type) {
+ switch (message_type) {
+ case kAdmitRequestMessage: return "AdmitRequestMessage";
+ case kWorkOrderMessage: return "WorkOrderMessage";
+ case kWorkOrderCompleteMessage: return "WorkOrderCompleteMessage";
+ case kCatalogRelationNewBlockMessage: return "CatalogRelationNewBlockMessage";
+ case kDataPipelineMessage: return "DataPipelineMessage";
+ case kWorkOrderFeedbackMessage: return "WorkOrderFeedbackMessage";
+ case kRebuildWorkOrderMessage: return "RebuildWorkOrderMessage";
+ case kRebuildWorkOrderCompleteMessage: return "RebuildWorkOrderCompleteMessage";
+ case kWorkloadCompletionMessage: return "WorkloadCompletionMessage";
+ case kPoisonMessage: return "PoisonMessage";
+#ifdef QUICKSTEP_DISTRIBUTED
+ case kShiftbossRegistrationMessage: return "ShiftbossRegistrationMessage";
+ case kShiftbossRegistrationResponseMessage: return "ShiftbossRegistrationResponseMessage";
+ case kDistributedCliRegistrationMessage: return "DistributedCliRegistrationMessage";
+ case kDistributedCliRegistrationResponseMessage: return "DistributedCliRegistrationResponseMessage";
+ case kSqlQueryMessage: return "SqlQueryMessage";
+ case kQueryInitiateMessage: return "QueryInitiateMessage";
+ case kQueryInitiateResponseMessage: return "QueryInitiateResponseMessage";
+ case kInitiateRebuildMessage: return "InitiateRebuildMessage";
+ case kInitiateRebuildResponseMessage: return "InitiateRebuildResponseMessage";
+ case kQueryTeardownMessage: return "QueryTeardownMessage";
+ case kQueryExecutionSuccessMessage: return "QueryExecutionSuccessMessage";
+ case kCommandResponseMessage: return "CommandResponseMessage";
+ case kQueryExecutionErrorMessage: return "QueryExecutionErrorMessage";
+ case kQueryResultTeardownMessage: return "QueryResultTeardownMessage";
+ case kBlockDomainRegistrationMessage: return "BlockDomainRegistrationMessage";
+ case kBlockDomainRegistrationResponseMessage: return "BlockDomainRegistrationResponseMessage";
+ case kBlockDomainToShiftbossIndexMessage: return "BlockDomainToShiftbossIndexMessage";
+ case kAddBlockLocationMessage: return "AddBlockLocationMessage";
+ case kDeleteBlockLocationMessage: return "DeleteBlockLocationMessage";
+ case kLocateBlockMessage: return "LocateBlockMessage";
+ case kLocateBlockResponseMessage: return "LocateBlockResponseMessage";
+ case kGetPeerDomainNetworkAddressesMessage: return "GetPeerDomainNetworkAddressesMessage";
+ case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage";
+ case kBlockDomainUnregistrationMessage: return "BlockDomainUnregistrationMessage";
+#endif // QUICKSTEP_DISTRIBUTED
+ default:
+ LOG(FATAL) << "Unknown message type";
+ }
+ }
+
/**
* @brief Send a TMB message to a single receiver.
*
@@ -145,8 +190,7 @@ class QueryExecutionUtil {
address.All(true);
tmb::TaggedMessage poison_tagged_message(kPoisonMessage);
- DLOG(INFO) << "TMB client ID " << sender_id
- << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all";
+ DLOG(INFO) << "Client " << sender_id << " broadcasts PoisonMessage to all";
const tmb::MessageBus::SendStatus send_status = bus->Send(
sender_id, address, style, std::move(poison_tagged_message));
CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 6c6f895..92645b6 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -182,8 +182,7 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
kInitiateRebuildMessage);
free(proto_bytes);
- DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
- << "') to all Shiftbosses";
+ DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage to all Shiftbosses";
QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
shiftboss_addresses_,
move(tagged_msg),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index fa922f0..d023d84 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -154,13 +154,15 @@ void Shiftboss::run() {
processShiftbossRegistrationResponseMessage();
+ AnnotatedMessage annotated_message;
+ tmb::message_type_id message_type;
for (;;) {
- AnnotatedMessage annotated_message;
if (bus_global_->ReceiveIfAvailable(shiftboss_client_id_global_, &annotated_message, 0, true)) {
+ message_type = annotated_message.tagged_message.message_type();
DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
- << "') received the typed '" << annotated_message.tagged_message.message_type()
- << "' message from Foreman " << annotated_message.sender;
- switch (annotated_message.tagged_message.message_type()) {
+ << "') received " << QueryExecutionUtil::MessageTypeToString(message_type)
+ << " from Foreman with Client " << annotated_message.sender;
+ switch (message_type) {
case kQueryInitiateMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
@@ -192,9 +194,8 @@ void Shiftboss::run() {
kWorkOrderMessage);
const size_t worker_index = getSchedulableWorker();
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
- << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
- << "') from Foreman to worker " << worker_index;
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_local_
+ << " forwarded WorkOrderMessage from Foreman to Worker " << worker_index;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_local_,
@@ -228,9 +229,8 @@ void Shiftboss::run() {
break;
}
case kPoisonMessage: {
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
- << "') forwarded PoisonMessage (typed '" << kPoisonMessage
- << "') from Foreman to all workers";
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_
+ << " forwarded PoisonMessage from Foreman to all Workers";
tmb::MessageStyle broadcast_style;
broadcast_style.Broadcast(true);
@@ -248,16 +248,17 @@ void Shiftboss::run() {
}
while (bus_local_->ReceiveIfAvailable(shiftboss_client_id_local_, &annotated_message, 0, true)) {
- switch (annotated_message.tagged_message.message_type()) {
+ message_type = annotated_message.tagged_message.message_type();
+ switch (message_type) {
case kCatalogRelationNewBlockMessage:
case kDataPipelineMessage:
case kWorkOrderFeedbackMessage:
case kWorkOrderCompleteMessage:
case kRebuildWorkOrderCompleteMessage: {
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
- << "') forwarded typed '" << annotated_message.tagged_message.message_type()
- << "' message from Worker with TMB client ID '" << annotated_message.sender
- << "' to Foreman with TMB client ID " << foreman_client_id_;
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_
+ << " forwarded " << QueryExecutionUtil::MessageTypeToString(message_type)
+ << " from Worker with Client " << annotated_message.sender
+ << " to Foreman with Client " << foreman_client_id_;
DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
const MessageBus::SendStatus send_status =
@@ -310,9 +311,8 @@ void Shiftboss::registerWithForeman() {
kShiftbossRegistrationMessage);
free(proto_bytes);
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
- << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
- << "') to all";
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_
+ << " sent ShiftbossRegistrationMessage to all";
tmb::MessageBus::SendStatus send_status =
bus_global_->Send(shiftboss_client_id_global_, all_addresses, style, move(message));
DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
@@ -324,9 +324,8 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
foreman_client_id_ = annotated_message.sender;
- DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_local_
- << "') received the typed '" << kShiftbossRegistrationResponseMessage
- << "' message from ForemanDistributed with client " << foreman_client_id_;
+ DLOG(INFO) << "Shiftboss with Client " << shiftboss_client_id_local_
+ << " received ShiftbossRegistrationResponseMessage from Foreman with Client " << foreman_client_id_;
serialization::ShiftbossRegistrationResponseMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -363,9 +362,8 @@ void Shiftboss::processQueryInitiateMessage(
kQueryInitiateResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
- << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
- << "') to Foreman with TMB client ID " << foreman_client_id_;
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_
+ << " sent QueryInitiateResponseMessage to Foreman with Client " << foreman_client_id_;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_global_,
shiftboss_client_id_global_,
@@ -402,9 +400,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kInitiateRebuildResponseMessage);
free(proto_bytes);
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
- << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
- << "') to Foreman with TMB client ID " << foreman_client_id_;
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_
+ << " sent InitiateRebuildResponseMessage to Foreman with Client " << foreman_client_id_;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_global_,
shiftboss_client_id_global_,
@@ -431,9 +428,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kRebuildWorkOrderMessage);
const size_t worker_index = getSchedulableWorker();
- DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
- << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
- << "') to worker " << worker_index;
+ DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_local_
+ << " sent RebuildWorkOrderMessage to Worker " << worker_index;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_local_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 9a548fd..1882f2e 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -111,11 +111,8 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
#ifdef QUICKSTEP_DISTRIBUTED
<< " in Shiftboss " << shiftboss_index_
#endif // QUICKSTEP_DISTRIBUTED
- << " sent "
- << (message_type == kWorkOrderCompleteMessage ? "WorkOrderCompleteMessage"
- : "RebuildWorkOrderCompleteMessage")
- << " (typed '" << message_type
- << "') to Scheduler with TMB client ID " << receiver;
+ << " sent " << QueryExecutionUtil::MessageTypeToString(message_type)
+ << " to Scheduler with Client " << receiver;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_, worker_client_id_, receiver, std::move(tagged_message));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index 32437c3..426a2c9 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -97,9 +97,8 @@ class BlockLocatorTest : public ::testing::Test {
TaggedMessage message(kPoisonMessage);
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') sent PoisonMessage (typed '" << kPoisonMessage
- << "') to BlockLocator (id '" << locator_client_id_ << "')";
+ LOG(INFO) << "Worker with Client " << worker_client_id_ << " sent PoisonMessage to BlockLocator with Client "
+ << locator_client_id_;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_,
worker_client_id_,
@@ -120,9 +119,7 @@ class BlockLocatorTest : public ::testing::Test {
kLocateBlockMessage);
free(proto_bytes);
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') sent LocateBlockMessage (typed '" << kLocateBlockMessage
- << "') to BlockLocator";
+ LOG(INFO) << "Worker wth Client " << worker_client_id_ << " sent LocateBlockMessage to BlockLocator";
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_,
worker_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 24da9bf..14cbf6f 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -146,8 +146,7 @@ void DeleteWorkOrder::execute() {
kDataPipelineMessage);
std::free(proto_bytes);
- DLOG(INFO) << "DeleteWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
- << "') to Scheduler with TMB client ID " << scheduler_client_id_;
+ DLOG(INFO) << "DeleteWorkOrder sent DataPipelineMessage to Scheduler with Client " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index 2cef1f1..7f0f7fc 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -101,8 +101,7 @@ class RebuildWorkOrder : public WorkOrder {
// Refer to InsertDestination::sendBlockFilledMessage for the rationale
// behind using the ClientIDMap map.
- DLOG(INFO) << "RebuildWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
- << "') to Scheduler with TMB client ID " << scheduler_client_id_;
+ DLOG(INFO) << "RebuildWorkOrder sent DataPipelineMessage to Scheduler with Client " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
ClientIDMap::Instance()->getValue(),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 143c741..40dfb22 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -128,8 +128,7 @@ void UpdateWorkOrder::execute() {
kDataPipelineMessage);
std::free(proto_bytes);
- DLOG(INFO) << "UpdateWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
- << "') to Scheduler with TMB client ID " << scheduler_client_id_;
+ DLOG(INFO) << "UpdateWorkOrder sent DataPipelineMessage to Scheduler with Client " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index c1b9b68..97f2a74 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -283,8 +283,7 @@ class WorkOrder {
tmb::MessageStyle single_receiver_style;
DCHECK(bus != nullptr);
- DLOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage (typed '" << kWorkOrderFeedbackMessage
- << "') to Scheduler with TMB client ID " << receiver_id;
+ DLOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage to Scheduler with Client " << receiver_id;
const tmb::MessageBus::SendStatus send_status =
bus->Send(sender_id,
receiver_address,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 714e6e5..75e1217 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -360,9 +360,8 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
kCatalogRelationNewBlockMessage);
free(proto_bytes);
- DLOG(INFO) << "AlwaysCreateBlockInsertDestination sent CatalogRelationNewBlockMessage (typed '"
- << kCatalogRelationNewBlockMessage
- << "') to Scheduler with TMB client ID " << scheduler_client_id_;
+ DLOG(INFO) << "AlwaysCreateBlockInsertDestination sent CatalogRelationNewBlockMessage to Scheduler with Client "
+ << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
thread_id_map_.getValue(),
@@ -410,9 +409,8 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
kCatalogRelationNewBlockMessage);
free(proto_bytes);
- DLOG(INFO) << "BlockPoolInsertDestination sent CatalogRelationNewBlockMessage (typed '"
- << kCatalogRelationNewBlockMessage
- << "') to Scheduler with TMB client ID " << scheduler_client_id_;
+ DLOG(INFO) << "BlockPoolInsertDestination sent CatalogRelationNewBlockMessage to Scheduler with Client "
+ << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
thread_id_map_.getValue(),
@@ -527,9 +525,8 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
kCatalogRelationNewBlockMessage);
free(proto_bytes);
- DLOG(INFO) << "PartitionAwareInsertDestination sent CatalogRelationNewBlockMessage (typed '"
- << kCatalogRelationNewBlockMessage
- << "') to Scheduler with TMB client ID " << scheduler_client_id_;
+ DLOG(INFO) << "PartitionAwareInsertDestination sent CatalogRelationNewBlockMessage to Scheduler with Client "
+ << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
thread_id_map_.getValue(),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 6707192..e9335ce 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -255,8 +255,7 @@ class InsertDestination : public InsertDestinationInterface {
// option 3.
DCHECK(bus_ != nullptr);
- DLOG(INFO) << "InsertDestination sent DataPipelineMessage (typed '" << kDataPipelineMessage
- << "') to Scheduler with TMB client ID " << scheduler_client_id_;
+ DLOG(INFO) << "InsertDestination sent DataPipelineMessage to Scheduler with Client " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
thread_id_map_.getValue(),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 94e1b67..4410385 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -256,9 +256,8 @@ StorageManager::~StorageManager() {
kBlockDomainUnregistrationMessage);
free(proto_bytes);
- LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') sent BlockDomainUnregistrationMessage (typed '" << kBlockDomainUnregistrationMessage
- << "') to BlockLocator";
+ LOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+ << " sent BlockDomainUnregistrationMessage to BlockLocator";
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
storage_manager_client_id_,
@@ -483,9 +482,8 @@ void StorageManager::sendBlockDomainToShiftbossIndexMessage(const std::size_t sh
kBlockDomainToShiftbossIndexMessage);
free(proto_bytes);
- DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') sent BlockDomainToShiftbossIndexMessage (typed '" << kBlockDomainToShiftbossIndexMessage
- << "') to BlockLocator";
+ DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+ << " sent BlockDomainToShiftbossIndexMessage to BlockLocator";
DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
DCHECK(bus_ != nullptr);
@@ -592,9 +590,8 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
kGetPeerDomainNetworkAddressesMessage);
free(proto_bytes);
- DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
- << "') to BlockLocator";
+ DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+ << " sent GetPeerDomainNetworkAddressesMessage to BlockLocator";
DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
DCHECK(bus_ != nullptr);
@@ -648,9 +645,9 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
message_type);
free(proto_bytes);
- DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
- << "') sent BlockLocationMessage (typed '" << message_type
- << "') to BlockLocator";
+ DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+ << " sent " << QueryExecutionUtil::MessageTypeToString(message_type)
+ << " to BlockLocator";
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(bus_,
storage_manager_client_id_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
index ac39728..d1fdef6 100644
--- a/storage/tests/DataExchange_unittest.cpp
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -113,9 +113,8 @@ class DataExchangeTest : public ::testing::Test {
TaggedMessage message(kPoisonMessage);
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') sent PoisonMessage (typed '" << kPoisonMessage
- << "') to BlockLocator (id '" << locator_client_id_ << "')";
+ LOG(INFO) << "Worker with Client " << worker_client_id_
+ << " sent PoisonMessage to BlockLocator with Client " << locator_client_id_;
CHECK(MessageBus::SendStatus::kOK ==
QueryExecutionUtil::SendTMBMessage(&bus_,
worker_client_id_,