You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/09 05:50:47 UTC

incubator-quickstep git commit: Use glog for all sent TMB messages.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/glog-tmb-msgs [created] 48f634c99


Use glog for all sent TMB messages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/48f634c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/48f634c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/48f634c9

Branch: refs/heads/glog-tmb-msgs
Commit: 48f634c99563583982c94c203fd81bafad99dd4b
Parents: 658cb61
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 22:49:59 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 8 22:49:59 2016 -0700

----------------------------------------------------------------------
 query_execution/ForemanSingleNode.cpp         | 14 +++---
 query_execution/PolicyEnforcerDistributed.cpp | 27 ++++-------
 query_execution/QueryExecutionUtil.hpp        |  6 +--
 query_execution/Shiftboss.cpp                 | 54 ++++++++--------------
 query_execution/Worker.cpp                    |  6 +--
 relational_operators/DeleteOperator.cpp       | 10 ++--
 relational_operators/RebuildWorkOrder.hpp     |  7 ++-
 relational_operators/UpdateOperator.cpp       | 10 ++--
 relational_operators/WorkOrder.hpp            |  7 ++-
 storage/InsertDestination.cpp                 | 18 +++++---
 storage/InsertDestination.hpp                 | 10 ++--
 11 files changed, 70 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 23db379..44e3b0d 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -170,16 +170,15 @@ void ForemanSingleNode::run() {
       // Signal the main thread that there are no queries to be executed.
       // Currently the message doesn't have any real content.
       TaggedMessage completion_tagged_message(kWorkloadCompletionMessage);
+      LOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage (typed '" << kWorkloadCompletionMessage
+                << "') to CLI with TMB client ID " << main_thread_client_id_;
       const tmb::MessageBus::SendStatus send_status =
           QueryExecutionUtil::SendTMBMessage(
               bus_,
               foreman_client_id_,
               main_thread_client_id_,
               move(completion_tagged_message));
-      CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-          << "Message could not be sent from Foreman with TMB client ID "
-          << foreman_client_id_ << " to main thread with TMB client ID"
-          << main_thread_client_id_;
+      CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
     }
   }
 }
@@ -227,15 +226,14 @@ void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
   }
   TaggedMessage worker_tagged_message(&message, sizeof(message), type);
 
+  LOG(INFO) << "ForemanSingleNode sent WorkOrderMessage (typed '" << type
+            << "') to Worker with TMB client ID " << worker_directory_->getClientID(worker_thread_index);
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
                                          worker_directory_->getClientID(worker_thread_index),
                                          move(worker_tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-      "Message could not be sent from Foreman with TMB client ID "
-      << foreman_client_id_ << " to Foreman with TMB client ID "
-      << worker_directory_->getClientID(worker_thread_index);
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 const std::vector<WorkOrderTimeEntry>& ForemanSingleNode

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6d0de47..7b83caa 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -170,18 +170,15 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
                         kQueryInitiateMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
-            << "') to Shiftboss 0";
-
   // TODO(zuyu): Multiple Shiftbosses support.
+  LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
+            << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
                                          shiftboss_directory_->getClientId(0),
                                          move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   // Wait Shiftboss for QueryInitiateResponseMessage.
   const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
@@ -216,15 +213,13 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
 
     // TODO(zuyu): Support multiple shiftbosses.
     LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
-              << "') to Shiftboss 0";
+              << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
     tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            foreman_client_id_,
                                            shiftboss_directory_->getClientId(0),
                                            move(message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-        << " to Shiftboss";
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
     TaggedMessage cli_message(kQueryExecutionSuccessMessage);
 
@@ -236,9 +231,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                                            foreman_client_id_,
                                            cli_id,
                                            move(cli_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-        << " to CLI with TMB client ID " << cli_id;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
     return;
   }
 
@@ -263,17 +256,15 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                         kSaveQueryResultMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
-            << "') to Shiftboss 0";
   // TODO(zuyu): Support multiple shiftbosses.
+  LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+            << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
                                          shiftboss_directory_->getClientId(0),
                                          move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 906fb6b..0ebf064 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -128,11 +128,11 @@ class QueryExecutionUtil {
     address.All(true);
     TaggedMessage poison_tagged_message(kPoisonMessage);
 
+    LOG(INFO) << "TMB client ID " << sender_id
+              << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all";
     const tmb::MessageBus::SendStatus send_status = bus->Send(
         sender_id, address, style, std::move(poison_tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-       "Broadcast poison message from sender with TMB client ID " << sender_id
-       << " failed";
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 24c91fe..78cd882 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -130,9 +130,7 @@ void Shiftboss::run() {
                                                shiftboss_client_id_,
                                                workers_->getClientID(worker_index),
                                                move(worker_tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
       case kInitiateRebuildMessage: {
@@ -156,7 +154,8 @@ void Shiftboss::run() {
       case kWorkOrderFeedbackMessage: {
         LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
                   << "') forwarded typed '" << annotated_message.tagged_message.message_type()
-                  << "' message from worker (client " << annotated_message.sender << ") to Foreman";
+                  << "' message from Worker with TMB client ID '" << annotated_message.sender
+                  << "' to Foreman with TMB client ID " << foreman_client_id_;
 
         DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
         const MessageBus::SendStatus send_status =
@@ -164,9 +163,7 @@ void Shiftboss::run() {
                                                shiftboss_client_id_,
                                                foreman_client_id_,
                                                move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to Foreman with TMB client ID " << foreman_client_id_;
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
       case kSaveQueryResultMessage: {
@@ -193,15 +190,13 @@ void Shiftboss::run() {
 
         LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
                   << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
-                  << "') to Foreman";
+                  << "') to Foreman with TMB client ID " << foreman_client_id_;
         const MessageBus::SendStatus send_status =
             QueryExecutionUtil::SendTMBMessage(bus_,
                                                shiftboss_client_id_,
                                                foreman_client_id_,
                                                move(message_response));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to Foreman with TMB client ID " << foreman_client_id_;
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
       case kPoisonMessage: {
@@ -217,9 +212,7 @@ void Shiftboss::run() {
                        worker_addresses_,
                        broadcast_style,
                        move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be broadcast from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to All workers";
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         return;
       }
       default: {
@@ -246,10 +239,6 @@ size_t Shiftboss::getSchedulableWorker() {
 }
 
 void Shiftboss::registerWithForeman() {
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
-            << "') to all";
-
   tmb::Address all_addresses;
   all_addresses.All(true);
 
@@ -267,6 +256,9 @@ void Shiftboss::registerWithForeman() {
                         kShiftbossRegistrationMessage);
   free(proto_bytes);
 
+  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+            << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
+            << "') to all";
   tmb::MessageBus::SendStatus send_status =
       bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
   DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
@@ -286,10 +278,6 @@ void Shiftboss::processQueryInitiateMessage(
                        bus_));
   query_contexts_.emplace(query_id, move(query_context));
 
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
-            << "') to Foreman";
-
   serialization::QueryInitiateResponseMessage proto;
   proto.set_query_id(query_id);
 
@@ -302,14 +290,15 @@ void Shiftboss::processQueryInitiateMessage(
                                  kQueryInitiateResponseMessage);
   free(proto_bytes);
 
+  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+            << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
+            << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          shiftboss_client_id_,
                                          foreman_client_id_,
                                          move(message_response));
-  CHECK(send_status == MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-      << " to Foreman with TMB client ID " << foreman_client_id_;
+  CHECK(send_status == MessageBus::SendStatus::kOK);
 }
 
 void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -325,10 +314,6 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
   vector<MutableBlockReference> partially_filled_block_refs;
   insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
 
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
-            << "') to Foreman";
-
   serialization::InitiateRebuildResponseMessage proto;
   proto.set_query_id(query_id);
   proto.set_operator_index(op_index);
@@ -344,14 +329,15 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                  kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
+  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+            << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
+            << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          shiftboss_client_id_,
                                          foreman_client_id_,
                                          move(message_response));
-  CHECK(send_status == MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-      << " to Foreman with TMB client ID " << foreman_client_id_;
+  CHECK(send_status == MessageBus::SendStatus::kOK);
 
   for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
     // NOTE(zuyu): Worker releases the memory after the execution of
@@ -381,9 +367,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                            shiftboss_client_id_,
                                            workers_->getClientID(worker_index),
                                            move(worker_tagged_message));
-    CHECK(send_status == MessageBus::SendStatus::kOK)
-        << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-        << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
+    CHECK(send_status == MessageBus::SendStatus::kOK);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index d497be6..3f28999 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -101,12 +101,12 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
       static_cast<const void *>(proto_bytes), proto_length, message_type);
   std::free(proto_bytes);
 
+  LOG(INFO) << "Worker sent WorkOrderCompleteMessage (typed '" << message_type
+            << "') to Scheduler with TMB client ID " << receiver;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(
           bus_, worker_client_id_, receiver, std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "Message could not be sent from worker with TMB client ID "
-      << worker_client_id_ << " to Foreman with TMB client ID " << receiver;
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 template <typename CompletionMessageProtoT>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 8197aef..ec26a3b 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -146,17 +146,15 @@ void DeleteWorkOrder::execute() {
                                     kDataPipelineMessage);
   std::free(proto_bytes);
 
-  const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
+  LOG(INFO) << "DeleteWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+            << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(
           bus_,
-          worker_thread_client_id,
+          ClientIDMap::Instance()->getValue(),
           scheduler_client_id_,
           std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not"
-      " be sent from thread with TMB client ID " <<
-      worker_thread_client_id << " to Foreman with TMB client ID "
-      << scheduler_client_id_;
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index fe4be68..1f1b019 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -101,15 +101,14 @@ class RebuildWorkOrder : public WorkOrder {
 
     // Refer to InsertDestination::sendBlockFilledMessage for the rationale
     // behind using the ClientIDMap map.
+    LOG(INFO) << "RebuildWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+              << "') to Scheduler with TMB client ID " << scheduler_client_id_;
     const tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            ClientIDMap::Instance()->getValue(),
                                            scheduler_client_id_,
                                            std::move(tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could "
-        " not be sent from thread with TMB client ID " <<
-        ClientIDMap::Instance()->getValue() << " to Foreman with TMB client ID "
-        << scheduler_client_id_;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index bc29365..426d011 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -128,17 +128,15 @@ void UpdateWorkOrder::execute() {
                                     kDataPipelineMessage);
   std::free(proto_bytes);
 
-  const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
+  LOG(INFO) << "UpdateWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+            << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(
           bus_,
-          worker_thread_client_id,
+          ClientIDMap::Instance()->getValue(),
           scheduler_client_id_,
           std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not"
-      " be sent from thread with TMB client ID " <<
-      worker_thread_client_id << " to Foreman with TMB client ID "
-      << scheduler_client_id_;
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 3cbab94..e241bec 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -44,7 +44,6 @@ namespace quickstep {
  *  @{
  */
 
-
 /**
  * @brief A single unit of work in a query plan, produced by a
  *        RelationalOperator. Where possible, WorkOrders should be of
@@ -284,14 +283,14 @@ class WorkOrder {
     tmb::MessageStyle single_receiver_style;
 
     DCHECK(bus != nullptr);
+    LOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage (typed '" << kWorkOrderFeedbackMessage
+              << "') to Scheduler with TMB client ID " << receiver_id;
     const tmb::MessageBus::SendStatus send_status =
         bus->Send(sender_id,
                   receiver_address,
                   single_receiver_style,
                   std::move(msg));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could"
-        " not be sent from thread with TMB client ID " << sender_id << " to"
-        " receiver thread with TMB client ID " << receiver_id;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 9897aed..61cbcf6 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -282,13 +282,15 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
                            kCatalogRelationNewBlockMessage);
   free(proto_bytes);
 
+  LOG(INFO) << "AlwaysCreateBlockInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+            << kCatalogRelationNewBlockMessage
+            << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          thread_id_map_.getValue(),
                                          scheduler_client_id_,
                                          move(tagged_msg));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   return storage_manager_->getBlockMutable(new_id, relation_);
 }
@@ -330,13 +332,15 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
                            kCatalogRelationNewBlockMessage);
   free(proto_bytes);
 
+  LOG(INFO) << "BlockPoolInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+            << kCatalogRelationNewBlockMessage
+            << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          thread_id_map_.getValue(),
                                          scheduler_client_id_,
                                          move(tagged_msg));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   return storage_manager_->getBlockMutable(new_id, relation_);
 }
@@ -445,13 +449,15 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
                            kCatalogRelationNewBlockMessage);
   free(proto_bytes);
 
+  LOG(INFO) << "PartitionAwareInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+            << kCatalogRelationNewBlockMessage
+            << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          thread_id_map_.getValue(),
                                          scheduler_client_id_,
                                          move(tagged_msg));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   return storage_manager_->getBlockMutable(new_id, relation_);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/48f634c9/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 3dae9a0..bc18d07 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -249,16 +249,14 @@ class InsertDestination : public InsertDestinationInterface {
     // option 3.
     DCHECK(bus_ != nullptr);
 
-    const tmb::client_id worker_thread_client_id = thread_id_map_.getValue();
+    LOG(INFO) << "InsertDestination sent DataPipelineMessage (typed '" << kDataPipelineMessage
+              << "') to Scheduler with TMB client ID " << scheduler_client_id_;
     const tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
-                                           worker_thread_client_id,
+                                           thread_id_map_.getValue(),
                                            scheduler_client_id_,
                                            std::move(tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-        "Message could not be sent from thread with TMB client ID "
-        << worker_thread_client_id << " to Scheduler with TMB client"
-        " ID " << scheduler_client_id_;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
   inline const std::size_t getQueryID() const {