You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/11 20:27:47 UTC
[07/16] incubator-quickstep git commit: Logged all sent messages
using glog.
Logged all sent messages using glog.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d9135a8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d9135a8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d9135a8a
Branch: refs/heads/LIP-for-tpch
Commit: d9135a8a2d11a1eabf6705c88391c498f4be38bb
Parents: 6168996
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 22:49:59 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Aug 9 19:48:01 2016 -0700
----------------------------------------------------------------------
query_execution/ForemanSingleNode.cpp | 14 ++--
query_execution/PolicyEnforcerDistributed.cpp | 38 ++++------
query_execution/QueryExecutionUtil.hpp | 6 +-
query_execution/Shiftboss.cpp | 86 +++++++++-------------
query_execution/Worker.cpp | 6 +-
relational_operators/DeleteOperator.cpp | 10 +--
relational_operators/RebuildWorkOrder.hpp | 7 +-
relational_operators/UpdateOperator.cpp | 10 +--
relational_operators/WorkOrder.hpp | 7 +-
storage/InsertDestination.cpp | 18 +++--
storage/InsertDestination.hpp | 10 +--
11 files changed, 92 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index d064a6f..7596b00 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -168,16 +168,15 @@ void ForemanSingleNode::run() {
// Signal the main thread that there are no queries to be executed.
// Currently the message doesn't have any real content.
TaggedMessage completion_tagged_message(kWorkloadCompletionMessage);
+ DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage (typed '" << kWorkloadCompletionMessage
+ << "') to CLI with TMB client ID " << main_thread_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_,
foreman_client_id_,
main_thread_client_id_,
move(completion_tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "Message could not be sent from Foreman with TMB client ID "
- << foreman_client_id_ << " to main thread with TMB client ID"
- << main_thread_client_id_;
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
}
}
@@ -225,15 +224,14 @@ void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
}
TaggedMessage worker_tagged_message(&message, sizeof(message), type);
+ DLOG(INFO) << "ForemanSingleNode sent WorkOrderMessage (typed '" << type
+ << "') to Worker with TMB client ID " << worker_directory_->getClientID(worker_thread_index);
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
worker_directory_->getClientID(worker_thread_index),
move(worker_tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
- "Message could not be sent from Foreman with TMB client ID "
- << foreman_client_id_ << " to Foreman with TMB client ID "
- << worker_directory_->getClientID(worker_thread_index);
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
const std::vector<WorkOrderTimeEntry>& ForemanSingleNode
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6d0de47..c76a9e1 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -170,25 +170,22 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
kQueryInitiateMessage);
free(proto_bytes);
- LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
- << "') to Shiftboss 0";
-
// TODO(zuyu): Multiple Shiftbosses support.
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
+ << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
shiftboss_directory_->getClientId(0),
move(message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
- << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
// Wait Shiftboss for QueryInitiateResponseMessage.
const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_message.tagged_message;
DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
- LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
- << "' message from client " << annotated_message.sender;
+ DLOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
S::QueryInitiateResponseMessage proto_response;
CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -215,30 +212,27 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
kQueryTeardownMessage);
// TODO(zuyu): Support multiple shiftbosses.
- LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
- << "') to Shiftboss 0";
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+ << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
shiftboss_directory_->getClientId(0),
move(message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
- << " to Shiftboss";
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
TaggedMessage cli_message(kQueryExecutionSuccessMessage);
// Notify the CLI query execution successfully.
- LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
- << "') to CLI with TMB client id " << cli_id;
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
+ << kQueryExecutionSuccessMessage
+ << "') to CLI with TMB client id " << cli_id;
send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
cli_id,
move(cli_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
- << " to CLI with TMB client ID " << cli_id;
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
return;
}
@@ -263,17 +257,15 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
kSaveQueryResultMessage);
free(proto_bytes);
- LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
- << "') to Shiftboss 0";
// TODO(zuyu): Support multiple shiftbosses.
+ DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+ << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
shiftboss_directory_->getClientId(0),
move(message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
- << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 906fb6b..feb4cc0 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -128,11 +128,11 @@ class QueryExecutionUtil {
address.All(true);
TaggedMessage poison_tagged_message(kPoisonMessage);
+ DLOG(INFO) << "TMB client ID " << sender_id
+ << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all";
const tmb::MessageBus::SendStatus send_status = bus->Send(
sender_id, address, style, std::move(poison_tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
- "Broadcast poison message from sender with TMB client ID " << sender_id
- << " failed";
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
private:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index bd83dd4..ddfd47f 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -73,9 +73,9 @@ void Shiftboss::run() {
// Receive() is a blocking call, causing this thread to sleep until next
// message is received.
AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') received the typed '" << annotated_message.tagged_message.message_type()
- << "' message from client " << annotated_message.sender;
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') received the typed '" << annotated_message.tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
switch (annotated_message.tagged_message.message_type()) {
case kShiftbossRegistrationResponseMessage: {
foreman_client_id_ = annotated_message.sender;
@@ -121,18 +121,16 @@ void Shiftboss::run() {
kWorkOrderMessage);
const size_t worker_index = getSchedulableWorker();
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
- << "') from Foreman to worker " << worker_index;
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
+ << "') from Foreman to worker " << worker_index;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
workers_->getClientID(worker_index),
move(worker_tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK)
- << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
- << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
+ CHECK(send_status == MessageBus::SendStatus::kOK);
break;
}
case kInitiateRebuildMessage: {
@@ -153,9 +151,10 @@ void Shiftboss::run() {
case kRebuildWorkOrderCompleteMessage:
case kDataPipelineMessage:
case kWorkOrderFeedbackMessage: {
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') forwarded typed '" << annotated_message.tagged_message.message_type()
- << "' message from worker (client " << annotated_message.sender << ") to Foreman";
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') forwarded typed '" << annotated_message.tagged_message.message_type()
+ << "' message from Worker with TMB client ID '" << annotated_message.sender
+ << "' to Foreman with TMB client ID " << foreman_client_id_;
DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
const MessageBus::SendStatus send_status =
@@ -163,9 +162,7 @@ void Shiftboss::run() {
shiftboss_client_id_,
foreman_client_id_,
move(annotated_message.tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK)
- << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
- << " to Foreman with TMB client ID " << foreman_client_id_;
+ CHECK(send_status == MessageBus::SendStatus::kOK);
break;
}
case kSaveQueryResultMessage: {
@@ -190,23 +187,21 @@ void Shiftboss::run() {
kSaveQueryResultResponseMessage);
free(proto_response_bytes);
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
- << "') to Foreman";
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
+ << "') to Foreman with TMB client ID " << foreman_client_id_;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
move(message_response));
- CHECK(send_status == MessageBus::SendStatus::kOK)
- << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
- << " to Foreman with TMB client ID " << foreman_client_id_;
+ CHECK(send_status == MessageBus::SendStatus::kOK);
break;
}
case kPoisonMessage: {
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') forwarded PoisonMessage (typed '" << kPoisonMessage
- << "') from Foreman to all workers";
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') forwarded PoisonMessage (typed '" << kPoisonMessage
+ << "') from Foreman to all workers";
tmb::MessageStyle broadcast_style;
broadcast_style.Broadcast(true);
@@ -216,9 +211,7 @@ void Shiftboss::run() {
worker_addresses_,
broadcast_style,
move(annotated_message.tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK)
- << "Message could not be broadcast from Shiftboss with TMB client ID " << shiftboss_client_id_
- << " to All workers";
+ CHECK(send_status == MessageBus::SendStatus::kOK);
return;
}
default: {
@@ -245,10 +238,6 @@ size_t Shiftboss::getSchedulableWorker() {
}
void Shiftboss::registerWithForeman() {
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
- << "') to all";
-
tmb::Address all_addresses;
all_addresses.All(true);
@@ -266,6 +255,9 @@ void Shiftboss::registerWithForeman() {
kShiftbossRegistrationMessage);
free(proto_bytes);
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
+ << "') to all";
tmb::MessageBus::SendStatus send_status =
bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
@@ -285,10 +277,6 @@ void Shiftboss::processQueryInitiateMessage(
bus_));
query_contexts_.emplace(query_id, move(query_context));
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
- << "') to Foreman";
-
serialization::QueryInitiateResponseMessage proto;
proto.set_query_id(query_id);
@@ -301,14 +289,15 @@ void Shiftboss::processQueryInitiateMessage(
kQueryInitiateResponseMessage);
free(proto_bytes);
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
+ << "') to Foreman with TMB client ID " << foreman_client_id_;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
move(message_response));
- CHECK(send_status == MessageBus::SendStatus::kOK)
- << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
- << " to Foreman with TMB client ID " << foreman_client_id_;
+ CHECK(send_status == MessageBus::SendStatus::kOK);
}
void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -324,10 +313,6 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
vector<MutableBlockReference> partially_filled_block_refs;
insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
- << "') to Foreman";
-
serialization::InitiateRebuildResponseMessage proto;
proto.set_query_id(query_id);
proto.set_operator_index(op_index);
@@ -343,14 +328,15 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kInitiateRebuildResponseMessage);
free(proto_bytes);
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
+ << "') to Foreman with TMB client ID " << foreman_client_id_;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
move(message_response));
- CHECK(send_status == MessageBus::SendStatus::kOK)
- << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
- << " to Foreman with TMB client ID " << foreman_client_id_;
+ CHECK(send_status == MessageBus::SendStatus::kOK);
for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
// NOTE(zuyu): Worker releases the memory after the execution of
@@ -371,18 +357,16 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kRebuildWorkOrderMessage);
const size_t worker_index = getSchedulableWorker();
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
- << "') to worker " << worker_index;
+ DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
+ << "') to worker " << worker_index;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
workers_->getClientID(worker_index),
move(worker_tagged_message));
- CHECK(send_status == MessageBus::SendStatus::kOK)
- << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
- << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
+ CHECK(send_status == MessageBus::SendStatus::kOK);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index d497be6..0b1efba 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -101,12 +101,12 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
static_cast<const void *>(proto_bytes), proto_length, message_type);
std::free(proto_bytes);
+ DLOG(INFO) << "Worker sent WorkOrderCompleteMessage (typed '" << message_type
+ << "') to Scheduler with TMB client ID " << receiver;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_, worker_client_id_, receiver, std::move(tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "Message could not be sent from worker with TMB client ID "
- << worker_client_id_ << " to Foreman with TMB client ID " << receiver;
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
template <typename CompletionMessageProtoT>
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 8197aef..24da9bf 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -146,17 +146,15 @@ void DeleteWorkOrder::execute() {
kDataPipelineMessage);
std::free(proto_bytes);
- const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
+ DLOG(INFO) << "DeleteWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+ << "') to Scheduler with TMB client ID " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_,
- worker_thread_client_id,
+ ClientIDMap::Instance()->getValue(),
scheduler_client_id_,
std::move(tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not"
- " be sent from thread with TMB client ID " <<
- worker_thread_client_id << " to Foreman with TMB client ID "
- << scheduler_client_id_;
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index fe4be68..2cef1f1 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -101,15 +101,14 @@ class RebuildWorkOrder : public WorkOrder {
// Refer to InsertDestination::sendBlockFilledMessage for the rationale
// behind using the ClientIDMap map.
+ DLOG(INFO) << "RebuildWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+ << "') to Scheduler with TMB client ID " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
ClientIDMap::Instance()->getValue(),
scheduler_client_id_,
std::move(tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could "
- " not be sent from thread with TMB client ID " <<
- ClientIDMap::Instance()->getValue() << " to Foreman with TMB client ID "
- << scheduler_client_id_;
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
private:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index bc29365..143c741 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -128,17 +128,15 @@ void UpdateWorkOrder::execute() {
kDataPipelineMessage);
std::free(proto_bytes);
- const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
+ DLOG(INFO) << "UpdateWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+ << "') to Scheduler with TMB client ID " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_,
- worker_thread_client_id,
+ ClientIDMap::Instance()->getValue(),
scheduler_client_id_,
std::move(tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not"
- " be sent from thread with TMB client ID " <<
- worker_thread_client_id << " to Foreman with TMB client ID "
- << scheduler_client_id_;
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 3cbab94..c1b9b68 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -44,7 +44,6 @@ namespace quickstep {
* @{
*/
-
/**
* @brief A single unit of work in a query plan, produced by a
* RelationalOperator. Where possible, WorkOrders should be of
@@ -284,14 +283,14 @@ class WorkOrder {
tmb::MessageStyle single_receiver_style;
DCHECK(bus != nullptr);
+ DLOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage (typed '" << kWorkOrderFeedbackMessage
+ << "') to Scheduler with TMB client ID " << receiver_id;
const tmb::MessageBus::SendStatus send_status =
bus->Send(sender_id,
receiver_address,
single_receiver_style,
std::move(msg));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could"
- " not be sent from thread with TMB client ID " << sender_id << " to"
- " receiver thread with TMB client ID " << receiver_id;
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 9897aed..5e83453 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -282,13 +282,15 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
kCatalogRelationNewBlockMessage);
free(proto_bytes);
+ DLOG(INFO) << "AlwaysCreateBlockInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+ << kCatalogRelationNewBlockMessage
+ << "') to Scheduler with TMB client ID " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
thread_id_map_.getValue(),
scheduler_client_id_,
move(tagged_msg));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
return storage_manager_->getBlockMutable(new_id, relation_);
}
@@ -330,13 +332,15 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
kCatalogRelationNewBlockMessage);
free(proto_bytes);
+ DLOG(INFO) << "BlockPoolInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+ << kCatalogRelationNewBlockMessage
+ << "') to Scheduler with TMB client ID " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
thread_id_map_.getValue(),
scheduler_client_id_,
move(tagged_msg));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
return storage_manager_->getBlockMutable(new_id, relation_);
}
@@ -445,13 +449,15 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
kCatalogRelationNewBlockMessage);
free(proto_bytes);
+ DLOG(INFO) << "PartitionAwareInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+ << kCatalogRelationNewBlockMessage
+ << "') to Scheduler with TMB client ID " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
thread_id_map_.getValue(),
scheduler_client_id_,
move(tagged_msg));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
- << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
return storage_manager_->getBlockMutable(new_id, relation_);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 3dae9a0..408e76b 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -249,16 +249,14 @@ class InsertDestination : public InsertDestinationInterface {
// option 3.
DCHECK(bus_ != nullptr);
- const tmb::client_id worker_thread_client_id = thread_id_map_.getValue();
+ DLOG(INFO) << "InsertDestination sent DataPipelineMessage (typed '" << kDataPipelineMessage
+ << "') to Scheduler with TMB client ID " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
- worker_thread_client_id,
+ thread_id_map_.getValue(),
scheduler_client_id_,
std::move(tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
- "Message could not be sent from thread with TMB client ID "
- << worker_thread_client_id << " to Scheduler with TMB client"
- " ID " << scheduler_client_id_;
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
inline const std::size_t getQueryID() const {