You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/11 20:27:47 UTC

[07/16] incubator-quickstep git commit: Logged all sent messages using glog.

Logged all sent messages using glog.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d9135a8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d9135a8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d9135a8a

Branch: refs/heads/LIP-for-tpch
Commit: d9135a8a2d11a1eabf6705c88391c498f4be38bb
Parents: 6168996
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 22:49:59 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Aug 9 19:48:01 2016 -0700

----------------------------------------------------------------------
 query_execution/ForemanSingleNode.cpp         | 14 ++--
 query_execution/PolicyEnforcerDistributed.cpp | 38 ++++------
 query_execution/QueryExecutionUtil.hpp        |  6 +-
 query_execution/Shiftboss.cpp                 | 86 +++++++++-------------
 query_execution/Worker.cpp                    |  6 +-
 relational_operators/DeleteOperator.cpp       | 10 +--
 relational_operators/RebuildWorkOrder.hpp     |  7 +-
 relational_operators/UpdateOperator.cpp       | 10 +--
 relational_operators/WorkOrder.hpp            |  7 +-
 storage/InsertDestination.cpp                 | 18 +++--
 storage/InsertDestination.hpp                 | 10 +--
 11 files changed, 92 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index d064a6f..7596b00 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -168,16 +168,15 @@ void ForemanSingleNode::run() {
       // Signal the main thread that there are no queries to be executed.
       // Currently the message doesn't have any real content.
       TaggedMessage completion_tagged_message(kWorkloadCompletionMessage);
+      DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage (typed '" << kWorkloadCompletionMessage
+                 << "') to CLI with TMB client ID " << main_thread_client_id_;
       const tmb::MessageBus::SendStatus send_status =
           QueryExecutionUtil::SendTMBMessage(
               bus_,
               foreman_client_id_,
               main_thread_client_id_,
               move(completion_tagged_message));
-      CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-          << "Message could not be sent from Foreman with TMB client ID "
-          << foreman_client_id_ << " to main thread with TMB client ID"
-          << main_thread_client_id_;
+      CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
     }
   }
 }
@@ -225,15 +224,14 @@ void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
   }
   TaggedMessage worker_tagged_message(&message, sizeof(message), type);
 
+  DLOG(INFO) << "ForemanSingleNode sent WorkOrderMessage (typed '" << type
+             << "') to Worker with TMB client ID " << worker_directory_->getClientID(worker_thread_index);
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
                                          worker_directory_->getClientID(worker_thread_index),
                                          move(worker_tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-      "Message could not be sent from Foreman with TMB client ID "
-      << foreman_client_id_ << " to Foreman with TMB client ID "
-      << worker_directory_->getClientID(worker_thread_index);
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 const std::vector<WorkOrderTimeEntry>& ForemanSingleNode

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6d0de47..c76a9e1 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -170,25 +170,22 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
                         kQueryInitiateMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
-            << "') to Shiftboss 0";
-
   // TODO(zuyu): Multiple Shiftbosses support.
+  DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
+             << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
                                          shiftboss_directory_->getClientId(0),
                                          move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   // Wait Shiftboss for QueryInitiateResponseMessage.
   const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
   const TaggedMessage &tagged_message = annotated_message.tagged_message;
   DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
-  LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
-            << "' message from client " << annotated_message.sender;
+  DLOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
+             << "' message from client " << annotated_message.sender;
 
   S::QueryInitiateResponseMessage proto_response;
   CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -215,30 +212,27 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                           kQueryTeardownMessage);
 
     // TODO(zuyu): Support multiple shiftbosses.
-    LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
-              << "') to Shiftboss 0";
+    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+               << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
     tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            foreman_client_id_,
                                            shiftboss_directory_->getClientId(0),
                                            move(message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-        << " to Shiftboss";
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
     TaggedMessage cli_message(kQueryExecutionSuccessMessage);
 
     // Notify the CLI query execution successfully.
-    LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
-              << "') to CLI with TMB client id " << cli_id;
+    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
+               << kQueryExecutionSuccessMessage
+               << "') to CLI with TMB client id " << cli_id;
     send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            foreman_client_id_,
                                            cli_id,
                                            move(cli_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-        << " to CLI with TMB client ID " << cli_id;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
     return;
   }
 
@@ -263,17 +257,15 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                         kSaveQueryResultMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
-            << "') to Shiftboss 0";
   // TODO(zuyu): Support multiple shiftbosses.
+  DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+             << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
                                          shiftboss_directory_->getClientId(0),
                                          move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 906fb6b..feb4cc0 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -128,11 +128,11 @@ class QueryExecutionUtil {
     address.All(true);
     TaggedMessage poison_tagged_message(kPoisonMessage);
 
+    DLOG(INFO) << "TMB client ID " << sender_id
+               << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all";
     const tmb::MessageBus::SendStatus send_status = bus->Send(
         sender_id, address, style, std::move(poison_tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-       "Broadcast poison message from sender with TMB client ID " << sender_id
-       << " failed";
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index bd83dd4..ddfd47f 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -73,9 +73,9 @@ void Shiftboss::run() {
     // Receive() is a blocking call, causing this thread to sleep until next
     // message is received.
     AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
-    LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-              << "') received the typed '" << annotated_message.tagged_message.message_type()
-              << "' message from client " << annotated_message.sender;
+    DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+               << "') received the typed '" << annotated_message.tagged_message.message_type()
+               << "' message from client " << annotated_message.sender;
     switch (annotated_message.tagged_message.message_type()) {
       case kShiftbossRegistrationResponseMessage: {
         foreman_client_id_ = annotated_message.sender;
@@ -121,18 +121,16 @@ void Shiftboss::run() {
                                             kWorkOrderMessage);
 
         const size_t worker_index = getSchedulableWorker();
-        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-                  << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
-                  << "') from Foreman to worker " << worker_index;
+        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                   << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
+                   << "') from Foreman to worker " << worker_index;
 
         const MessageBus::SendStatus send_status =
             QueryExecutionUtil::SendTMBMessage(bus_,
                                                shiftboss_client_id_,
                                                workers_->getClientID(worker_index),
                                                move(worker_tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
       case kInitiateRebuildMessage: {
@@ -153,9 +151,10 @@ void Shiftboss::run() {
       case kRebuildWorkOrderCompleteMessage:
       case kDataPipelineMessage:
       case kWorkOrderFeedbackMessage: {
-        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-                  << "') forwarded typed '" << annotated_message.tagged_message.message_type()
-                  << "' message from worker (client " << annotated_message.sender << ") to Foreman";
+        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                   << "') forwarded typed '" << annotated_message.tagged_message.message_type()
+                   << "' message from Worker with TMB client ID '" << annotated_message.sender
+                   << "' to Foreman with TMB client ID " << foreman_client_id_;
 
         DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
         const MessageBus::SendStatus send_status =
@@ -163,9 +162,7 @@ void Shiftboss::run() {
                                                shiftboss_client_id_,
                                                foreman_client_id_,
                                                move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to Foreman with TMB client ID " << foreman_client_id_;
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
       case kSaveQueryResultMessage: {
@@ -190,23 +187,21 @@ void Shiftboss::run() {
                                        kSaveQueryResultResponseMessage);
         free(proto_response_bytes);
 
-        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-                  << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
-                  << "') to Foreman";
+        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                   << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
+                   << "') to Foreman with TMB client ID " << foreman_client_id_;
         const MessageBus::SendStatus send_status =
             QueryExecutionUtil::SendTMBMessage(bus_,
                                                shiftboss_client_id_,
                                                foreman_client_id_,
                                                move(message_response));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to Foreman with TMB client ID " << foreman_client_id_;
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
       case kPoisonMessage: {
-        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-                  << "') forwarded PoisonMessage (typed '" << kPoisonMessage
-                  << "') from Foreman to all workers";
+        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                   << "') forwarded PoisonMessage (typed '" << kPoisonMessage
+                   << "') from Foreman to all workers";
 
         tmb::MessageStyle broadcast_style;
         broadcast_style.Broadcast(true);
@@ -216,9 +211,7 @@ void Shiftboss::run() {
                        worker_addresses_,
                        broadcast_style,
                        move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be broadcast from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to All workers";
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         return;
       }
       default: {
@@ -245,10 +238,6 @@ size_t Shiftboss::getSchedulableWorker() {
 }
 
 void Shiftboss::registerWithForeman() {
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
-            << "') to all";
-
   tmb::Address all_addresses;
   all_addresses.All(true);
 
@@ -266,6 +255,9 @@ void Shiftboss::registerWithForeman() {
                         kShiftbossRegistrationMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+             << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
+             << "') to all";
   tmb::MessageBus::SendStatus send_status =
       bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
   DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
@@ -285,10 +277,6 @@ void Shiftboss::processQueryInitiateMessage(
                        bus_));
   query_contexts_.emplace(query_id, move(query_context));
 
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
-            << "') to Foreman";
-
   serialization::QueryInitiateResponseMessage proto;
   proto.set_query_id(query_id);
 
@@ -301,14 +289,15 @@ void Shiftboss::processQueryInitiateMessage(
                                  kQueryInitiateResponseMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+             << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
+             << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          shiftboss_client_id_,
                                          foreman_client_id_,
                                          move(message_response));
-  CHECK(send_status == MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-      << " to Foreman with TMB client ID " << foreman_client_id_;
+  CHECK(send_status == MessageBus::SendStatus::kOK);
 }
 
 void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -324,10 +313,6 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
   vector<MutableBlockReference> partially_filled_block_refs;
   insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
 
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
-            << "') to Foreman";
-
   serialization::InitiateRebuildResponseMessage proto;
   proto.set_query_id(query_id);
   proto.set_operator_index(op_index);
@@ -343,14 +328,15 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                  kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+             << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
+             << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          shiftboss_client_id_,
                                          foreman_client_id_,
                                          move(message_response));
-  CHECK(send_status == MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-      << " to Foreman with TMB client ID " << foreman_client_id_;
+  CHECK(send_status == MessageBus::SendStatus::kOK);
 
   for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
     // NOTE(zuyu): Worker releases the memory after the execution of
@@ -371,18 +357,16 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                         kRebuildWorkOrderMessage);
 
     const size_t worker_index = getSchedulableWorker();
-    LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-              << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
-              << "') to worker " << worker_index;
+    DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+               << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
+               << "') to worker " << worker_index;
 
     const MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            shiftboss_client_id_,
                                            workers_->getClientID(worker_index),
                                            move(worker_tagged_message));
-    CHECK(send_status == MessageBus::SendStatus::kOK)
-        << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-        << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
+    CHECK(send_status == MessageBus::SendStatus::kOK);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index d497be6..0b1efba 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -101,12 +101,12 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
       static_cast<const void *>(proto_bytes), proto_length, message_type);
   std::free(proto_bytes);
 
+  DLOG(INFO) << "Worker sent WorkOrderCompleteMessage (typed '" << message_type
+             << "') to Scheduler with TMB client ID " << receiver;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(
           bus_, worker_client_id_, receiver, std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "Message could not be sent from worker with TMB client ID "
-      << worker_client_id_ << " to Foreman with TMB client ID " << receiver;
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 template <typename CompletionMessageProtoT>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 8197aef..24da9bf 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -146,17 +146,15 @@ void DeleteWorkOrder::execute() {
                                     kDataPipelineMessage);
   std::free(proto_bytes);
 
-  const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
+  DLOG(INFO) << "DeleteWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(
           bus_,
-          worker_thread_client_id,
+          ClientIDMap::Instance()->getValue(),
           scheduler_client_id_,
           std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not"
-      " be sent from thread with TMB client ID " <<
-      worker_thread_client_id << " to Foreman with TMB client ID "
-      << scheduler_client_id_;
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index fe4be68..2cef1f1 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -101,15 +101,14 @@ class RebuildWorkOrder : public WorkOrder {
 
     // Refer to InsertDestination::sendBlockFilledMessage for the rationale
     // behind using the ClientIDMap map.
+    DLOG(INFO) << "RebuildWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+               << "') to Scheduler with TMB client ID " << scheduler_client_id_;
     const tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            ClientIDMap::Instance()->getValue(),
                                            scheduler_client_id_,
                                            std::move(tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could "
-        " not be sent from thread with TMB client ID " <<
-        ClientIDMap::Instance()->getValue() << " to Foreman with TMB client ID "
-        << scheduler_client_id_;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index bc29365..143c741 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -128,17 +128,15 @@ void UpdateWorkOrder::execute() {
                                     kDataPipelineMessage);
   std::free(proto_bytes);
 
-  const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
+  DLOG(INFO) << "UpdateWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(
           bus_,
-          worker_thread_client_id,
+          ClientIDMap::Instance()->getValue(),
           scheduler_client_id_,
           std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not"
-      " be sent from thread with TMB client ID " <<
-      worker_thread_client_id << " to Foreman with TMB client ID "
-      << scheduler_client_id_;
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 3cbab94..c1b9b68 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -44,7 +44,6 @@ namespace quickstep {
  *  @{
  */
 
-
 /**
  * @brief A single unit of work in a query plan, produced by a
  *        RelationalOperator. Where possible, WorkOrders should be of
@@ -284,14 +283,14 @@ class WorkOrder {
     tmb::MessageStyle single_receiver_style;
 
     DCHECK(bus != nullptr);
+    DLOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage (typed '" << kWorkOrderFeedbackMessage
+               << "') to Scheduler with TMB client ID " << receiver_id;
     const tmb::MessageBus::SendStatus send_status =
         bus->Send(sender_id,
                   receiver_address,
                   single_receiver_style,
                   std::move(msg));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could"
-        " not be sent from thread with TMB client ID " << sender_id << " to"
-        " receiver thread with TMB client ID " << receiver_id;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 9897aed..5e83453 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -282,13 +282,15 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
                            kCatalogRelationNewBlockMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "AlwaysCreateBlockInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+             << kCatalogRelationNewBlockMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          thread_id_map_.getValue(),
                                          scheduler_client_id_,
                                          move(tagged_msg));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   return storage_manager_->getBlockMutable(new_id, relation_);
 }
@@ -330,13 +332,15 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
                            kCatalogRelationNewBlockMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "BlockPoolInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+             << kCatalogRelationNewBlockMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          thread_id_map_.getValue(),
                                          scheduler_client_id_,
                                          move(tagged_msg));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   return storage_manager_->getBlockMutable(new_id, relation_);
 }
@@ -445,13 +449,15 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
                            kCatalogRelationNewBlockMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "PartitionAwareInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+             << kCatalogRelationNewBlockMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          thread_id_map_.getValue(),
                                          scheduler_client_id_,
                                          move(tagged_msg));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   return storage_manager_->getBlockMutable(new_id, relation_);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 3dae9a0..408e76b 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -249,16 +249,14 @@ class InsertDestination : public InsertDestinationInterface {
     // option 3.
     DCHECK(bus_ != nullptr);
 
-    const tmb::client_id worker_thread_client_id = thread_id_map_.getValue();
+    DLOG(INFO) << "InsertDestination sent DataPipelineMessage (typed '" << kDataPipelineMessage
+               << "') to Scheduler with TMB client ID " << scheduler_client_id_;
     const tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
-                                           worker_thread_client_id,
+                                           thread_id_map_.getValue(),
                                            scheduler_client_id_,
                                            std::move(tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-        "Message could not be sent from thread with TMB client ID "
-        << worker_thread_client_id << " to Scheduler with TMB client"
-        " ID " << scheduler_client_id_;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
   inline const std::size_t getQueryID() const {