You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/10 18:35:01 UTC

[1/5] incubator-quickstep git commit: Deserialized Window Aggr WorkOrder. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/dist-q-opt-exe-gen-test 41a5bceaa -> 5f450d7c0 (forced update)


Deserialized Window Aggr WorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/85e02de4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/85e02de4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/85e02de4

Branch: refs/heads/dist-q-opt-exe-gen-test
Commit: 85e02de49205409accfef3737dadfe95aad1f5c0
Parents: 658cb61
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 23:14:08 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 8 23:14:08 2016 -0700

----------------------------------------------------------------------
 relational_operators/CMakeLists.txt       |  1 +
 relational_operators/WorkOrderFactory.cpp | 25 +++++++++++++++++++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/85e02de4/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 9696392..43a42f9 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -467,6 +467,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_UpdateOperator
+                      quickstep_relationaloperators_WindowAggregationOperator
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/85e02de4/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index f920cac..721d735 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -46,6 +46,7 @@
 #include "relational_operators/TableGeneratorOperator.hpp"
 #include "relational_operators/TextScanOperator.hpp"
 #include "relational_operators/UpdateOperator.hpp"
+#include "relational_operators/WindowAggregationOperator.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/StorageBlockInfo.hpp"
 
@@ -419,6 +420,22 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           shiftboss_client_id,
           bus);
     }
+    case serialization::WINDOW_AGGREGATION: {
+      LOG(INFO) << "Creating WindowAggregationWorkOrder";
+      vector<block_id> blocks;
+      for (int i = 0; i < proto.ExtensionSize(serialization::WindowAggregationWorkOrder::block_ids); ++i) {
+        blocks.push_back(
+            proto.GetExtension(serialization::WindowAggregationWorkOrder::block_ids, i));
+      }
+
+      return new WindowAggregationWorkOrder(
+          proto.query_id(),
+          query_context->getWindowAggregationState(
+              proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)),
+          move(blocks),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
+    }
     default:
       LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto";
   }
@@ -697,6 +714,14 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
              proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
              proto.HasExtension(serialization::UpdateWorkOrder::block_id);
     }
+    case serialization::WINDOW_AGGREGATION: {
+      return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&
+             query_context.isValidWindowAggregationStateId(
+                 proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)) &&
+             proto.HasExtension(serialization::WindowAggregationWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index));
+    }
     default:
       return false;
   }


[4/5] incubator-quickstep git commit: Logged all sent messages using glog.

Posted by zu...@apache.org.
Logged all sent messages using glog.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d9135a8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d9135a8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d9135a8a

Branch: refs/heads/dist-q-opt-exe-gen-test
Commit: d9135a8a2d11a1eabf6705c88391c498f4be38bb
Parents: 6168996
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 22:49:59 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Aug 9 19:48:01 2016 -0700

----------------------------------------------------------------------
 query_execution/ForemanSingleNode.cpp         | 14 ++--
 query_execution/PolicyEnforcerDistributed.cpp | 38 ++++------
 query_execution/QueryExecutionUtil.hpp        |  6 +-
 query_execution/Shiftboss.cpp                 | 86 +++++++++-------------
 query_execution/Worker.cpp                    |  6 +-
 relational_operators/DeleteOperator.cpp       | 10 +--
 relational_operators/RebuildWorkOrder.hpp     |  7 +-
 relational_operators/UpdateOperator.cpp       | 10 +--
 relational_operators/WorkOrder.hpp            |  7 +-
 storage/InsertDestination.cpp                 | 18 +++--
 storage/InsertDestination.hpp                 | 10 +--
 11 files changed, 92 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index d064a6f..7596b00 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -168,16 +168,15 @@ void ForemanSingleNode::run() {
       // Signal the main thread that there are no queries to be executed.
       // Currently the message doesn't have any real content.
       TaggedMessage completion_tagged_message(kWorkloadCompletionMessage);
+      DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage (typed '" << kWorkloadCompletionMessage
+                 << "') to CLI with TMB client ID " << main_thread_client_id_;
       const tmb::MessageBus::SendStatus send_status =
           QueryExecutionUtil::SendTMBMessage(
               bus_,
               foreman_client_id_,
               main_thread_client_id_,
               move(completion_tagged_message));
-      CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-          << "Message could not be sent from Foreman with TMB client ID "
-          << foreman_client_id_ << " to main thread with TMB client ID"
-          << main_thread_client_id_;
+      CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
     }
   }
 }
@@ -225,15 +224,14 @@ void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index,
   }
   TaggedMessage worker_tagged_message(&message, sizeof(message), type);
 
+  DLOG(INFO) << "ForemanSingleNode sent WorkOrderMessage (typed '" << type
+             << "') to Worker with TMB client ID " << worker_directory_->getClientID(worker_thread_index);
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
                                          worker_directory_->getClientID(worker_thread_index),
                                          move(worker_tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-      "Message could not be sent from Foreman with TMB client ID "
-      << foreman_client_id_ << " to Foreman with TMB client ID "
-      << worker_directory_->getClientID(worker_thread_index);
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 const std::vector<WorkOrderTimeEntry>& ForemanSingleNode

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6d0de47..c76a9e1 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -170,25 +170,22 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
                         kQueryInitiateMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
-            << "') to Shiftboss 0";
-
   // TODO(zuyu): Multiple Shiftbosses support.
+  DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
+             << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
                                          shiftboss_directory_->getClientId(0),
                                          move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   // Wait Shiftboss for QueryInitiateResponseMessage.
   const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
   const TaggedMessage &tagged_message = annotated_message.tagged_message;
   DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
-  LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
-            << "' message from client " << annotated_message.sender;
+  DLOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
+             << "' message from client " << annotated_message.sender;
 
   S::QueryInitiateResponseMessage proto_response;
   CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -215,30 +212,27 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                           kQueryTeardownMessage);
 
     // TODO(zuyu): Support multiple shiftbosses.
-    LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
-              << "') to Shiftboss 0";
+    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+               << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
     tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            foreman_client_id_,
                                            shiftboss_directory_->getClientId(0),
                                            move(message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-        << " to Shiftboss";
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
     TaggedMessage cli_message(kQueryExecutionSuccessMessage);
 
     // Notify the CLI query execution successfully.
-    LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
-              << "') to CLI with TMB client id " << cli_id;
+    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
+               << kQueryExecutionSuccessMessage
+               << "') to CLI with TMB client id " << cli_id;
     send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            foreman_client_id_,
                                            cli_id,
                                            move(cli_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-        << " to CLI with TMB client ID " << cli_id;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
     return;
   }
 
@@ -263,17 +257,15 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                         kSaveQueryResultMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
-            << "') to Shiftboss 0";
   // TODO(zuyu): Support multiple shiftbosses.
+  DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+             << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          foreman_client_id_,
                                          shiftboss_directory_->getClientId(0),
                                          move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
-      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 906fb6b..feb4cc0 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -128,11 +128,11 @@ class QueryExecutionUtil {
     address.All(true);
     TaggedMessage poison_tagged_message(kPoisonMessage);
 
+    DLOG(INFO) << "TMB client ID " << sender_id
+               << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all";
     const tmb::MessageBus::SendStatus send_status = bus->Send(
         sender_id, address, style, std::move(poison_tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-       "Broadcast poison message from sender with TMB client ID " << sender_id
-       << " failed";
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index bd83dd4..ddfd47f 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -73,9 +73,9 @@ void Shiftboss::run() {
     // Receive() is a blocking call, causing this thread to sleep until next
     // message is received.
     AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
-    LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-              << "') received the typed '" << annotated_message.tagged_message.message_type()
-              << "' message from client " << annotated_message.sender;
+    DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+               << "') received the typed '" << annotated_message.tagged_message.message_type()
+               << "' message from client " << annotated_message.sender;
     switch (annotated_message.tagged_message.message_type()) {
       case kShiftbossRegistrationResponseMessage: {
         foreman_client_id_ = annotated_message.sender;
@@ -121,18 +121,16 @@ void Shiftboss::run() {
                                             kWorkOrderMessage);
 
         const size_t worker_index = getSchedulableWorker();
-        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-                  << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
-                  << "') from Foreman to worker " << worker_index;
+        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                   << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
+                   << "') from Foreman to worker " << worker_index;
 
         const MessageBus::SendStatus send_status =
             QueryExecutionUtil::SendTMBMessage(bus_,
                                                shiftboss_client_id_,
                                                workers_->getClientID(worker_index),
                                                move(worker_tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
       case kInitiateRebuildMessage: {
@@ -153,9 +151,10 @@ void Shiftboss::run() {
       case kRebuildWorkOrderCompleteMessage:
       case kDataPipelineMessage:
       case kWorkOrderFeedbackMessage: {
-        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-                  << "') forwarded typed '" << annotated_message.tagged_message.message_type()
-                  << "' message from worker (client " << annotated_message.sender << ") to Foreman";
+        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                   << "') forwarded typed '" << annotated_message.tagged_message.message_type()
+                   << "' message from Worker with TMB client ID '" << annotated_message.sender
+                   << "' to Foreman with TMB client ID " << foreman_client_id_;
 
         DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
         const MessageBus::SendStatus send_status =
@@ -163,9 +162,7 @@ void Shiftboss::run() {
                                                shiftboss_client_id_,
                                                foreman_client_id_,
                                                move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to Foreman with TMB client ID " << foreman_client_id_;
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
       case kSaveQueryResultMessage: {
@@ -190,23 +187,21 @@ void Shiftboss::run() {
                                        kSaveQueryResultResponseMessage);
         free(proto_response_bytes);
 
-        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-                  << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
-                  << "') to Foreman";
+        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                   << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
+                   << "') to Foreman with TMB client ID " << foreman_client_id_;
         const MessageBus::SendStatus send_status =
             QueryExecutionUtil::SendTMBMessage(bus_,
                                                shiftboss_client_id_,
                                                foreman_client_id_,
                                                move(message_response));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to Foreman with TMB client ID " << foreman_client_id_;
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
       case kPoisonMessage: {
-        LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-                  << "') forwarded PoisonMessage (typed '" << kPoisonMessage
-                  << "') from Foreman to all workers";
+        DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+                   << "') forwarded PoisonMessage (typed '" << kPoisonMessage
+                   << "') from Foreman to all workers";
 
         tmb::MessageStyle broadcast_style;
         broadcast_style.Broadcast(true);
@@ -216,9 +211,7 @@ void Shiftboss::run() {
                        worker_addresses_,
                        broadcast_style,
                        move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK)
-            << "Message could not be broadcast from Shiftboss with TMB client ID " << shiftboss_client_id_
-            << " to All workers";
+        CHECK(send_status == MessageBus::SendStatus::kOK);
         return;
       }
       default: {
@@ -245,10 +238,6 @@ size_t Shiftboss::getSchedulableWorker() {
 }
 
 void Shiftboss::registerWithForeman() {
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
-            << "') to all";
-
   tmb::Address all_addresses;
   all_addresses.All(true);
 
@@ -266,6 +255,9 @@ void Shiftboss::registerWithForeman() {
                         kShiftbossRegistrationMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+             << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
+             << "') to all";
   tmb::MessageBus::SendStatus send_status =
       bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
   DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
@@ -285,10 +277,6 @@ void Shiftboss::processQueryInitiateMessage(
                        bus_));
   query_contexts_.emplace(query_id, move(query_context));
 
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
-            << "') to Foreman";
-
   serialization::QueryInitiateResponseMessage proto;
   proto.set_query_id(query_id);
 
@@ -301,14 +289,15 @@ void Shiftboss::processQueryInitiateMessage(
                                  kQueryInitiateResponseMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+             << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
+             << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          shiftboss_client_id_,
                                          foreman_client_id_,
                                          move(message_response));
-  CHECK(send_status == MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-      << " to Foreman with TMB client ID " << foreman_client_id_;
+  CHECK(send_status == MessageBus::SendStatus::kOK);
 }
 
 void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -324,10 +313,6 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
   vector<MutableBlockReference> partially_filled_block_refs;
   insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
 
-  LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-            << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
-            << "') to Foreman";
-
   serialization::InitiateRebuildResponseMessage proto;
   proto.set_query_id(query_id);
   proto.set_operator_index(op_index);
@@ -343,14 +328,15 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                  kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+             << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
+             << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          shiftboss_client_id_,
                                          foreman_client_id_,
                                          move(message_response));
-  CHECK(send_status == MessageBus::SendStatus::kOK)
-      << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-      << " to Foreman with TMB client ID " << foreman_client_id_;
+  CHECK(send_status == MessageBus::SendStatus::kOK);
 
   for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
     // NOTE(zuyu): Worker releases the memory after the execution of
@@ -371,18 +357,16 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                         kRebuildWorkOrderMessage);
 
     const size_t worker_index = getSchedulableWorker();
-    LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-              << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
-              << "') to worker " << worker_index;
+    DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+               << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
+               << "') to worker " << worker_index;
 
     const MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            shiftboss_client_id_,
                                            workers_->getClientID(worker_index),
                                            move(worker_tagged_message));
-    CHECK(send_status == MessageBus::SendStatus::kOK)
-        << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
-        << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
+    CHECK(send_status == MessageBus::SendStatus::kOK);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index d497be6..0b1efba 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -101,12 +101,12 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
       static_cast<const void *>(proto_bytes), proto_length, message_type);
   std::free(proto_bytes);
 
+  DLOG(INFO) << "Worker sent WorkOrderCompleteMessage (typed '" << message_type
+             << "') to Scheduler with TMB client ID " << receiver;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(
           bus_, worker_client_id_, receiver, std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "Message could not be sent from worker with TMB client ID "
-      << worker_client_id_ << " to Foreman with TMB client ID " << receiver;
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 template <typename CompletionMessageProtoT>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 8197aef..24da9bf 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -146,17 +146,15 @@ void DeleteWorkOrder::execute() {
                                     kDataPipelineMessage);
   std::free(proto_bytes);
 
-  const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
+  DLOG(INFO) << "DeleteWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(
           bus_,
-          worker_thread_client_id,
+          ClientIDMap::Instance()->getValue(),
           scheduler_client_id_,
           std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not"
-      " be sent from thread with TMB client ID " <<
-      worker_thread_client_id << " to Foreman with TMB client ID "
-      << scheduler_client_id_;
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index fe4be68..2cef1f1 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -101,15 +101,14 @@ class RebuildWorkOrder : public WorkOrder {
 
     // Refer to InsertDestination::sendBlockFilledMessage for the rationale
     // behind using the ClientIDMap map.
+    DLOG(INFO) << "RebuildWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+               << "') to Scheduler with TMB client ID " << scheduler_client_id_;
     const tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            ClientIDMap::Instance()->getValue(),
                                            scheduler_client_id_,
                                            std::move(tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could "
-        " not be sent from thread with TMB client ID " <<
-        ClientIDMap::Instance()->getValue() << " to Foreman with TMB client ID "
-        << scheduler_client_id_;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index bc29365..143c741 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -128,17 +128,15 @@ void UpdateWorkOrder::execute() {
                                     kDataPipelineMessage);
   std::free(proto_bytes);
 
-  const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
+  DLOG(INFO) << "UpdateWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(
           bus_,
-          worker_thread_client_id,
+          ClientIDMap::Instance()->getValue(),
           scheduler_client_id_,
           std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not"
-      " be sent from thread with TMB client ID " <<
-      worker_thread_client_id << " to Foreman with TMB client ID "
-      << scheduler_client_id_;
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 3cbab94..c1b9b68 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -44,7 +44,6 @@ namespace quickstep {
  *  @{
  */
 
-
 /**
  * @brief A single unit of work in a query plan, produced by a
  *        RelationalOperator. Where possible, WorkOrders should be of
@@ -284,14 +283,14 @@ class WorkOrder {
     tmb::MessageStyle single_receiver_style;
 
     DCHECK(bus != nullptr);
+    DLOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage (typed '" << kWorkOrderFeedbackMessage
+               << "') to Scheduler with TMB client ID " << receiver_id;
     const tmb::MessageBus::SendStatus send_status =
         bus->Send(sender_id,
                   receiver_address,
                   single_receiver_style,
                   std::move(msg));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could"
-        " not be sent from thread with TMB client ID " << sender_id << " to"
-        " receiver thread with TMB client ID " << receiver_id;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 9897aed..5e83453 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -282,13 +282,15 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
                            kCatalogRelationNewBlockMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "AlwaysCreateBlockInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+             << kCatalogRelationNewBlockMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          thread_id_map_.getValue(),
                                          scheduler_client_id_,
                                          move(tagged_msg));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   return storage_manager_->getBlockMutable(new_id, relation_);
 }
@@ -330,13 +332,15 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
                            kCatalogRelationNewBlockMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "BlockPoolInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+             << kCatalogRelationNewBlockMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          thread_id_map_.getValue(),
                                          scheduler_client_id_,
                                          move(tagged_msg));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   return storage_manager_->getBlockMutable(new_id, relation_);
 }
@@ -445,13 +449,15 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
                            kCatalogRelationNewBlockMessage);
   free(proto_bytes);
 
+  DLOG(INFO) << "PartitionAwareInsertDestination sent CatalogRelationNewBlockMessage (typed '"
+             << kCatalogRelationNewBlockMessage
+             << "') to Scheduler with TMB client ID " << scheduler_client_id_;
   const tmb::MessageBus::SendStatus send_status =
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          thread_id_map_.getValue(),
                                          scheduler_client_id_,
                                          move(tagged_msg));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
-      << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman.";
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 
   return storage_manager_->getBlockMutable(new_id, relation_);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 3dae9a0..408e76b 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -249,16 +249,14 @@ class InsertDestination : public InsertDestinationInterface {
     // option 3.
     DCHECK(bus_ != nullptr);
 
-    const tmb::client_id worker_thread_client_id = thread_id_map_.getValue();
+    DLOG(INFO) << "InsertDestination sent DataPipelineMessage (typed '" << kDataPipelineMessage
+               << "') to Scheduler with TMB client ID " << scheduler_client_id_;
     const tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
-                                           worker_thread_client_id,
+                                           thread_id_map_.getValue(),
                                            scheduler_client_id_,
                                            std::move(tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
-        "Message could not be sent from thread with TMB client ID "
-        << worker_thread_client_id << " to Scheduler with TMB client"
-        " ID " << scheduler_client_id_;
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
   }
 
   inline const std::size_t getQueryID() const {


[5/5] incubator-quickstep git commit: Added ForemanDistributed and unit tests.

Posted by zu...@apache.org.
Added ForemanDistributed and unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5f450d7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5f450d7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5f450d7c

Branch: refs/heads/dist-q-opt-exe-gen-test
Commit: 5f450d7c0a63340a845742f6dea3880b68a05889
Parents: d9135a8
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 11:57:35 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Wed Aug 10 11:26:36 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |  40 ++-
 query_execution/ForemanDistributed.cpp          | 334 +++++++++++++++++++
 query_execution/ForemanDistributed.hpp          | 130 ++++++++
 query_execution/PolicyEnforcerDistributed.cpp   |   4 +-
 query_execution/PolicyEnforcerDistributed.hpp   |   2 +-
 query_execution/QueryExecutionTypedefs.hpp      |   2 +-
 query_execution/Shiftboss.cpp                   |  20 +-
 query_execution/Shiftboss.hpp                   |  22 +-
 query_optimizer/tests/CMakeLists.txt            |  40 +++
 .../tests/DistributedExecutionGeneratorTest.cpp |  57 ++++
 .../DistributedExecutionGeneratorTestRunner.cpp | 121 +++++++
 .../DistributedExecutionGeneratorTestRunner.hpp | 143 ++++++++
 .../tests/execution_generator/CMakeLists.txt    |  72 +++-
 13 files changed, 964 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 74fcafb..1b27194 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -31,8 +31,11 @@ endif()
 add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
 if (ENABLE_DISTRIBUTED)
@@ -52,12 +55,12 @@ add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryEx
 add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp)
   add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
 add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
 add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -80,12 +83,32 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_threading_ThreadUtil
                         quickstep_utility_Macros
                         tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_ForemanBase
                       glog
                       quickstep_threading_Thread
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_ForemanDistributed
+                        glog
+                        quickstep_catalog_CatalogDatabase
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_catalog_Catalog_proto
+                        quickstep_queryexecution_AdmitRequestMessage
+                        quickstep_queryexecution_ForemanBase
+                        quickstep_queryexecution_PolicyEnforcerDistributed
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_threading_ThreadUtil
+                        quickstep_utility_EqualsAnyConstant
+                        quickstep_utility_Macros
+                        tmb
+                        ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
@@ -223,7 +246,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_utility_DAG
                         quickstep_utility_Macros
                         tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryContext
@@ -262,7 +285,7 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
                         quickstep_utility_Macros
                         tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
                       glog
                       quickstep_relationaloperators_WorkOrder_proto
@@ -316,11 +339,12 @@ target_link_libraries(quickstep_queryexecution
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_ForemanDistributed
                         quickstep_queryexecution_PolicyEnforcerDistributed
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_Shiftboss
                         quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
 
 # Tests:
 if (ENABLE_DISTRIBUTED)
@@ -346,7 +370,7 @@ if (ENABLE_DISTRIBUTED)
                         tmb
                         ${LIBS})
   add_test(BlockLocator_unittest BlockLocator_unittest)
-endif()
+endif(ENABLE_DISTRIBUTED)
 
 add_executable(QueryManagerSingleNode_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
new file mode 100644
index 0000000..3a20459
--- /dev/null
+++ b/query_execution/ForemanDistributed.cpp
@@ -0,0 +1,334 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/ForemanDistributed.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+class QueryHandle;
+
+ForemanDistributed::ForemanDistributed(
+    MessageBus *bus,
+    CatalogDatabaseLite *catalog_database,
+    const int cpu_id,
+    const bool profile_individual_workorders)
+    : ForemanBase(bus, cpu_id),
+      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
+  const std::vector<QueryExecutionMessageType> sender_message_types{
+      kShiftbossRegistrationResponseMessage,
+      kQueryInitiateMessage,
+      kWorkOrderMessage,
+      kInitiateRebuildMessage,
+      kQueryTeardownMessage,
+      kSaveQueryResultMessage,
+      kQueryExecutionSuccessMessage,
+      kPoisonMessage};
+
+  for (const auto message_type : sender_message_types) {
+    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+  }
+
+  const std::vector<QueryExecutionMessageType> receiver_message_types{
+      kShiftbossRegistrationMessage,
+      kAdmitRequestMessage,
+      kQueryInitiateResponseMessage,
+      kCatalogRelationNewBlockMessage,
+      kDataPipelineMessage,
+      kInitiateRebuildResponseMessage,
+      kWorkOrderCompleteMessage,
+      kRebuildWorkOrderCompleteMessage,
+      kWorkOrderFeedbackMessage,
+      kSaveQueryResultResponseMessage,
+      kPoisonMessage};
+
+  for (const auto message_type : receiver_message_types) {
+    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+  }
+
+  policy_enforcer_.reset(new PolicyEnforcerDistributed(
+      foreman_client_id_,
+      catalog_database_,
+      &shiftboss_directory_,
+      bus_,
+      profile_individual_workorders));
+}
+
+void ForemanDistributed::run() {
+  if (cpu_id_ >= 0) {
+    // We can pin the foreman thread to a CPU if specified.
+    ThreadUtil::BindToCPU(cpu_id_);
+  }
+
+  // Ensure that at least one Shiftboss to register.
+  if (shiftboss_directory_.empty()) {
+    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
+    LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
+              << "' message from client " << annotated_message.sender;
+
+    S::ShiftbossRegistrationMessage proto;
+    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+    processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
+    DCHECK_EQ(1u, shiftboss_directory_.size());
+  }
+
+  // Event loop
+  for (;;) {
+    // Receive() causes this thread to sleep until next message is received.
+    const AnnotatedMessage annotated_message =
+        bus_->Receive(foreman_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    const tmb::message_type_id message_type = tagged_message.message_type();
+    LOG(INFO) << "ForemanDistributed received typed '" << message_type
+              << "' message from client " << annotated_message.sender;
+    switch (message_type) {
+      case kShiftbossRegistrationMessage: {
+        S::ShiftbossRegistrationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
+        break;
+      }
+      case kAdmitRequestMessage: {
+        const AdmitRequestMessage *request_message =
+            static_cast<const AdmitRequestMessage*>(tagged_message.message());
+
+        const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
+        DCHECK(!query_handles.empty());
+
+        bool all_queries_admitted = true;
+        if (query_handles.size() == 1u) {
+          all_queries_admitted =
+              policy_enforcer_->admitQuery(query_handles.front());
+        } else {
+          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+        }
+        if (!all_queries_admitted) {
+          LOG(WARNING) << "The scheduler could not admit all the queries";
+          // TODO(harshad) - Inform the main thread about the failure.
+        }
+        break;
+      }
+      case kQueryInitiateResponseMessage: {
+        // TODO(zuyu): check the query id.
+        break;
+      }
+      case kCatalogRelationNewBlockMessage:  // Fall through
+      case kDataPipelineMessage:
+      case kRebuildWorkOrderCompleteMessage:
+      case kWorkOrderCompleteMessage:
+      case kWorkOrderFeedbackMessage: {
+        policy_enforcer_->processMessage(tagged_message);
+        break;
+      }
+      case kInitiateRebuildResponseMessage: {
+        // A unique case in the distributed version.
+        policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
+        break;
+      }
+      case kSaveQueryResultResponseMessage: {
+        S::SaveQueryResultResponseMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+        break;
+      }
+      case kPoisonMessage: {
+        if (policy_enforcer_->hasQueries()) {
+          LOG(WARNING) << "Foreman thread exiting while some queries are "
+                          "under execution or waiting to be admitted";
+        }
+
+        // Shutdown all Shiftbosses.
+        tmb::Address shiftboss_addresses;
+        for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
+          shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
+        }
+
+        tmb::MessageStyle broadcast_style;
+        broadcast_style.Broadcast(true);
+
+        TaggedMessage poison_message(kPoisonMessage);
+
+        const MessageBus::SendStatus send_status =
+            bus_->Send(foreman_client_id_,
+                       shiftboss_addresses,
+                       broadcast_style,
+                       move(poison_message));
+        DCHECK(send_status == MessageBus::SendStatus::kOK);
+        return;
+      }
+      default:
+        LOG(FATAL) << "Unknown message type to Foreman";
+    }
+
+    if (canCollectNewMessages(message_type)) {
+      vector<unique_ptr<S::WorkOrderMessage>> new_messages;
+      policy_enforcer_->getWorkOrderProtoMessages(&new_messages);
+      dispatchWorkOrderMessages(new_messages);
+    }
+  }
+}
+
+bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
+  return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+                                        kCatalogRelationNewBlockMessage,
+                                        kWorkOrderFeedbackMessage) &&
+         // TODO(zuyu): Multiple Shiftbosses support.
+         !shiftboss_directory_.hasReachedCapacity(0);
+}
+
+void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
+  for (const auto &message : messages) {
+    DCHECK(message != nullptr);
+    // TODO(zuyu): Multiple Shiftbosses support.
+    sendWorkOrderMessage(0, *message);
+    shiftboss_directory_.incrementNumQueuedWorkOrders(0);
+  }
+}
+
+void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
+                                              const S::WorkOrderMessage &proto) {
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kWorkOrderMessage);
+  free(proto_bytes);
+
+  const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index);
+  LOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage
+            << "') to Shiftboss with TMB client ID " << shiftboss_client_id;
+  const MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_client_id,
+                                         move(message));
+  CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id,
+                                                        std::FILE *out) const {
+  const std::vector<WorkOrderTimeEntry> &recorded_times =
+      policy_enforcer_->getProfilingResults(query_id);
+  fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out);
+  for (const auto &workorder_entry : recorded_times) {
+    const std::size_t worker_id = workorder_entry.worker_id;
+    fprintf(out,
+            "%lu,%lu,%lu,%lu\n",
+            query_id,
+            worker_id,
+            workorder_entry.operator_id,  // Operator ID.
+            workorder_entry.end_time - workorder_entry.start_time);  // Time.
+  }
+}
+
+void ForemanDistributed::processShiftbossRegisterationMessage(const client_id shiftboss_client_id,
+                                                              const std::size_t work_order_capacity) {
+  S::ShiftbossRegistrationResponseMessage proto;
+  proto.set_shiftboss_index(shiftboss_directory_.size());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kShiftbossRegistrationResponseMessage);
+  free(proto_bytes);
+
+  shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity);
+
+  LOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '"
+            << kShiftbossRegistrationResponseMessage
+            << "') to Shiftboss with TMB client id " << shiftboss_client_id;
+  const MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_client_id,
+                                         move(message));
+  CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id,
+                                                               const relation_id result_relation_id) {
+  S::QueryExecutionSuccessMessage proto;
+  proto.mutable_result_relation()->MergeFrom(
+      static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kQueryExecutionSuccessMessage);
+  free(proto_bytes);
+
+  // Notify the CLI regarding the query result.
+  LOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
+            << "') to CLI with TMB client id " << cli_id;
+  const MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         cli_id,
+                                         move(message));
+  CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
new file mode 100644
index 0000000..4404e29
--- /dev/null
+++ b/query_execution/ForemanDistributed.hpp
@@ -0,0 +1,130 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ForemanBase.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief The Foreman receives queries from the main thread, messages from the
+ *        policy enforcer and dispatches the work to Shiftbosses. It also
+ *        receives work completion messages from Shiftbosses.
+ **/
+class ForemanDistributed final : public ForemanBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param bus A pointer to the TMB.
+   * @param catalog_database The catalog database where this query is executed.
+   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+   * @param profile_individual_workorders Whether every workorder's execution
+   *        be profiled or not.
+   *
+   * @note If cpu_id is not specified, Foreman thread can be possibly moved
+   *       around on different CPUs by the OS.
+  **/
+  ForemanDistributed(tmb::MessageBus *bus,
+                     CatalogDatabaseLite *catalog_database,
+                     const int cpu_id = -1,
+                     const bool profile_individual_workorders = false);
+
+  ~ForemanDistributed() override {}
+
+  /**
+   * @brief Print the results of profiling individual work orders for a given
+   *        query.
+   *
+   * TODO(harshad) - Add the name of the operator to the output.
+   *
+   * @param query_id The ID of the query for which the results are to be printed.
+   * @param out The file stream.
+   **/
+  void printWorkOrderProfilingResults(const std::size_t query_id,
+                                      std::FILE *out) const;
+
+ protected:
+  void run() override;
+
+ private:
+  /**
+   * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
+   *        worker threads.
+   *
+   * @param messages The messages to be dispatched.
+   **/
+  void dispatchWorkOrderMessages(
+      const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
+
+  /**
+   * @brief Send the given message to the specified worker.
+   *
+   * @param worker_index The logical index of the recipient worker in
+   *        ShiftbossDirectory.
+   * @param proto The WorkOrderMessage to be sent.
+   **/
+  void sendWorkOrderMessage(const std::size_t worker_index,
+                            const serialization::WorkOrderMessage &proto);
+
+  void processShiftbossRegisterationMessage(const tmb::client_id shiftboss_client_id,
+                                            const std::size_t work_order_capacity);
+
+  void processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
+                                             const relation_id result_relation_id);
+
+  /**
+   * @brief Check if we can collect new messages from the PolicyEnforcer.
+   *
+   * @param message_type The type of the last received message.
+   **/
+  bool canCollectNewMessages(const tmb::message_type_id message_type);
+
+  ShiftbossDirectory shiftboss_directory_;
+
+  CatalogDatabaseLite *catalog_database_;
+
+  std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
+
+  DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index c76a9e1..3fba899 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -58,7 +58,7 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
               " can be allocated in a single round of dispatch of messages to"
               " the workers.");
 
-void PolicyEnforcerDistributed::getWorkOrderMessages(
+void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
     vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
   // Iterate over admitted queries until either there are no more
   // messages available, or the maximum number of messages have
@@ -67,7 +67,7 @@ void PolicyEnforcerDistributed::getWorkOrderMessages(
   // TODO(harshad) - Make this function generic enough so that it
   // works well when multiple queries are getting executed.
   if (admitted_queries_.empty()) {
-    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
     return;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 16ebe36..f2ab23a 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -78,7 +78,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    *
    * @param work_order_messages The work order messages to be dispatched.
    **/
-  void getWorkOrderMessages(
+  void getWorkOrderProtoMessages(
       std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 33a93b0..0ad77c7 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -63,7 +63,7 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
 
 // We sort the following message types in the order of a life cycle of a query.
 enum QueryExecutionMessageType : message_type_id {
-  kAdmitRequestMessage,  // Requesting a query (or queries) to be admitted, from
+  kAdmitRequestMessage = 0,  // Requesting a query (or queries) to be admitted, from
                          // the main thread to Foreman.
   kWorkOrderMessage,  // From Foreman to Worker.
   kWorkOrderCompleteMessage,  // From Worker to Foreman.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ddfd47f..5c2c5e0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -147,10 +147,11 @@ void Shiftboss::run() {
                                       proto.relation_id());
         break;
       }
-      case kWorkOrderCompleteMessage:  // Fall through.
-      case kRebuildWorkOrderCompleteMessage:
+      case kCatalogRelationNewBlockMessage:  // Fall through.
       case kDataPipelineMessage:
-      case kWorkOrderFeedbackMessage: {
+      case kWorkOrderFeedbackMessage:
+      case kWorkOrderCompleteMessage:
+      case kRebuildWorkOrderCompleteMessage: {
         DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
                    << "') forwarded typed '" << annotated_message.tagged_message.message_type()
                    << "' message from Worker with TMB client ID '" << annotated_message.sender
@@ -165,6 +166,15 @@ void Shiftboss::run() {
         CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
+      case kQueryTeardownMessage: {
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+        serialization::QueryTeardownMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        query_contexts_.erase(proto.query_id());
+        break;
+      }
       case kSaveQueryResultMessage: {
         const TaggedMessage &tagged_message = annotated_message.tagged_message;
 
@@ -175,8 +185,12 @@ void Shiftboss::run() {
           storage_manager_->saveBlockOrBlob(proto.blocks(i));
         }
 
+        // Clean up query execution states, i.e., QueryContext.
+        query_contexts_.erase(proto.query_id());
+
         serialization::SaveQueryResultResponseMessage proto_response;
         proto_response.set_relation_id(proto.relation_id());
+        proto_response.set_cli_id(proto.cli_id());
 
         const size_t proto_response_length = proto_response.ByteSize();
         char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 30a8d1a..94b10a2 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <unordered_map>
 
@@ -97,27 +98,34 @@ class Shiftboss : public Thread {
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
 
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+
     // Message sent to Worker.
     bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
 
-    // Message sent to Foreman.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
     // Forward the following message types from Foreman to Workers.
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
 
     // Forward the following message types from Workers to Foreman.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
 
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+    // Clean up query execution states, i.e., QueryContext.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
 
     // Stop itself.
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 1453291..357313c 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -80,6 +80,14 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
                       quickstep_utility_Macros
                       tmb)
 
+if (ENABLE_DISTRIBUTED)
+  add_executable(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
+                 DistributedExecutionGeneratorTest.cpp
+                 DistributedExecutionGeneratorTestRunner.cpp
+                 DistributedExecutionGeneratorTestRunner.hpp
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+endif(ENABLE_DISTRIBUTED)
 add_executable(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
                ExecutionGeneratorTest.cpp
                ExecutionGeneratorTestRunner.cpp
@@ -109,6 +117,38 @@ add_executable(quickstep_queryoptimizer_tests_OptimizerTextTest
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
 
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
+                        glog
+                        gtest
+                        gtest_main
+                        quickstep_catalog_CatalogDatabase
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_cli_DropRelation
+                        quickstep_cli_PrintToScreen
+                        quickstep_parser_ParseStatement
+                        quickstep_parser_SqlParserWrapper
+                        quickstep_queryexecution_ForemanDistributed
+                        quickstep_queryexecution_QueryContext
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_Shiftboss
+                        quickstep_queryexecution_Worker
+                        quickstep_queryexecution_WorkerDirectory
+                        quickstep_queryoptimizer_ExecutionGenerator
+                        quickstep_queryoptimizer_LogicalGenerator
+                        quickstep_queryoptimizer_OptimizerContext
+                        quickstep_queryoptimizer_PhysicalGenerator
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_queryoptimizer_physical_Physical
+                        quickstep_queryoptimizer_tests_TestDatabaseLoader
+                        quickstep_utility_Macros
+                        quickstep_utility_MemStream
+                        quickstep_utility_SqlError
+                        quickstep_utility_TextBasedTestDriver
+                        tmb
+                        ${LIBS})
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
                       ${GFLAGS_LIB_NAME}
                       glog

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
new file mode 100644
index 0000000..ae14e03
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <fstream>
+#include <memory>
+
+#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+#include "utility/textbased_test/TextBasedTest.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+using quickstep::TextBasedTest;
+
+QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
+
+int main(int argc, char** argv) {
+  google::InitGoogleLogging(argv[0]);
+  // Honor FLAGS_buffer_pool_slots in StorageManager.
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  if (argc < 4) {
+    LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1
+               << " are provided";
+  }
+
+  std::ifstream input_file(argv[1]);
+  CHECK(input_file.is_open()) << argv[1];
+  std::unique_ptr<quickstep::optimizer::DistributedExecutionGeneratorTestRunner>
+      test_runner(
+          new quickstep::optimizer::DistributedExecutionGeneratorTestRunner(argv[3]));
+  test_driver.reset(
+      new quickstep::TextBasedTestDriver(&input_file, test_runner.get()));
+  test_driver->registerOption(
+      quickstep::optimizer::DistributedExecutionGeneratorTestRunner::kResetOption);
+
+  ::testing::InitGoogleTest(&argc, argv);
+  const int success = RUN_ALL_TESTS();
+  if (success != 0) {
+    test_driver->writeActualOutputToFile(argv[2]);
+  }
+
+  return success;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
new file mode 100644
index 0000000..45851f1
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
+
+#include <cstdio>
+#include <set>
+#include <string>
+
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_optimizer/ExecutionGenerator.hpp"
+#include "query_optimizer/LogicalGenerator.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/PhysicalGenerator.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "utility/MemStream.hpp"
+#include "utility/SqlError.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogRelation;
+
+namespace optimizer {
+
+const char *DistributedExecutionGeneratorTestRunner::kResetOption =
+    "reset_before_execution";
+
+void DistributedExecutionGeneratorTestRunner::runTestCase(
+    const std::string &input, const std::set<std::string> &options,
+    std::string *output) {
+  // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
+
+  VLOG(4) << "Test SQL(s): " << input;
+
+  if (options.find(kResetOption) != options.end()) {
+    test_database_loader_.clear();
+    test_database_loader_.createTestRelation(false /* allow_vchar */);
+    test_database_loader_.loadTestRelation();
+  }
+
+  MemStream output_stream;
+  sql_parser_.feedNextBuffer(new std::string(input));
+
+  while (true) {
+    ParseResult result = sql_parser_.getNextStatement();
+
+    OptimizerContext optimizer_context(test_database_loader_.catalog_database(),
+                                       test_database_loader_.storage_manager());
+
+    if (result.condition != ParseResult::kSuccess) {
+      if (result.condition == ParseResult::kError) {
+        *output = result.error_message;
+      }
+      break;
+    }
+
+    std::printf("%s\n", result.parsed_statement->toString().c_str());
+    try {
+      QueryHandle query_handle(query_id_++, cli_id_);
+      LogicalGenerator logical_generator(&optimizer_context);
+      PhysicalGenerator physical_generator;
+      ExecutionGenerator execution_generator(&optimizer_context,
+                                             &query_handle);
+
+      const physical::PhysicalPtr physical_plan =
+          physical_generator.generatePlan(
+              logical_generator.generatePlan(*result.parsed_statement));
+      execution_generator.generatePlan(physical_plan);
+
+      QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+          cli_id_,
+          foreman_->getBusClientID(),
+          &query_handle,
+          &bus_);
+
+      const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+      DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+      const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
+      if (query_result_relation) {
+          PrintToScreen::PrintRelation(*query_result_relation,
+                                       test_database_loader_.storage_manager(),
+                                       output_stream.file());
+          DropRelation::Drop(*query_result_relation,
+                             test_database_loader_.catalog_database(),
+                             test_database_loader_.storage_manager());
+      }
+    } catch (const SqlError &error) {
+      *output = error.formatMessage(input);
+      break;
+    }
+  }
+
+  if (output->empty()) {
+    *output = output_stream.str();
+  }
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
new file mode 100644
index 0000000..ae81013
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -0,0 +1,143 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "utility/Macros.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+#include "utility/textbased_test/TextBasedTestRunner.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+namespace optimizer {
+
+/**
+ * @brief TextBasedTestRunner for testing the ExecutionGenerator in the
+ *        distributed version.
+ */
+class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
+ public:
+  /**
+   * @brief If this option is enabled, recreate the entire database and
+   *        repopulate the data before every test.
+   */
+  static const char *kResetOption;
+
+  /**
+   * @brief Constructor.
+   */
+  explicit DistributedExecutionGeneratorTestRunner(const std::string &storage_path)
+      : query_id_(0),
+        test_database_loader_(storage_path) {
+    test_database_loader_.createTestRelation(false /* allow_vchar */);
+    test_database_loader_.loadTestRelation();
+
+    bus_.Initialize();
+
+    // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
+    // could receive a registration message from the latter.
+    foreman_.reset(new ForemanDistributed(&bus_, test_database_loader_.catalog_database()));
+
+    worker_.reset(new Worker(0 /* worker_thread_index */, &bus_));
+
+    std::vector<tmb::client_id> worker_client_ids;
+    worker_client_ids.push_back(worker_->getBusClientID());
+
+    // We don't use the NUMA aware version of worker code.
+    const std::vector<numa_node_id> numa_nodes(worker_client_ids.size(), kAnyNUMANodeID);
+
+    workers_.reset(
+        new WorkerDirectory(worker_client_ids.size(), worker_client_ids, numa_nodes));
+
+    shiftboss_.reset(new Shiftboss(&bus_, test_database_loader_.storage_manager(), workers_.get()));
+
+    cli_id_ = bus_.Connect();
+    bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage);
+    bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
+    bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+
+    foreman_->start();
+
+    shiftboss_->start();
+    worker_->start();
+  }
+
+  ~DistributedExecutionGeneratorTestRunner() {
+    tmb::TaggedMessage poison_tagged_message(quickstep::kPoisonMessage);
+
+    const tmb::MessageBus::SendStatus send_status =
+        QueryExecutionUtil::SendTMBMessage(
+            &bus_,
+            cli_id_,
+            foreman_->getBusClientID(),
+            std::move(poison_tagged_message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+    worker_->join();
+    shiftboss_->join();
+
+    foreman_->join();
+  }
+
+  void runTestCase(const std::string &input,
+                   const std::set<std::string> &options,
+                   std::string *output) override;
+
+ private:
+  std::size_t query_id_;
+
+  SqlParserWrapper sql_parser_;
+  TestDatabaseLoader test_database_loader_;
+
+  MessageBusImpl bus_;
+
+  tmb::client_id cli_id_;
+
+  std::unique_ptr<ForemanDistributed> foreman_;
+
+  std::unique_ptr<Worker> worker_;
+  std::unique_ptr<WorkerDirectory> workers_;
+
+  std::unique_ptr<Shiftboss> shiftboss_;
+
+  DISALLOW_COPY_AND_ASSIGN(DistributedExecutionGeneratorTestRunner);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f450d7c/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 1980980..1ea6a17 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -15,6 +15,63 @@
 # specific language governing permissions and limitations
 # under the License.
 
+if (ENABLE_DISTRIBUTED)
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_create
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_delete
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_distinct
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_drop
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_index
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_insert
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_join
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Join.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_select
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Select.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_stringpatternmatching
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/StringPatternMatching.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_tablegenerator
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/TableGenerator.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator/")
+  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_update
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Update.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate/")
+endif(ENABLE_DISTRIBUTED)
 add_test(quickstep_queryoptimizer_tests_executiongenerator_create
          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -76,6 +133,19 @@ add_test(quickstep_queryoptimizer_tests_executiongenerator_update
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
+if (ENABLE_DISTRIBUTED)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate)
+endif(ENABLE_DISTRIBUTED)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Drop)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Index)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Insert)
@@ -83,4 +153,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
-file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
\ No newline at end of file
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)


[2/5] incubator-quickstep git commit: Fixed bugs in creating WorkOrderProtos.

Posted by zu...@apache.org.
Fixed bugs in creating WorkOrderProtos.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2c0ce6a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2c0ce6a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2c0ce6a3

Branch: refs/heads/dist-q-opt-exe-gen-test
Commit: 2c0ce6a3bcf2ec40b0c32d077552cda3e225f787
Parents: 85e02de
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 18:42:32 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Aug 9 10:42:25 2016 -0700

----------------------------------------------------------------------
 relational_operators/HashJoinOperator.cpp | 1 +
 relational_operators/WorkOrderFactory.cpp | 4 +---
 2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2c0ce6a3/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 7851f41..779c0fe 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -393,6 +393,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer *
 serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const block_id block) {
   serialization::WorkOrder *proto = new serialization::WorkOrder;
   proto->set_work_order_type(serialization::HASH_JOIN);
+  proto->set_query_id(query_id_);
 
   proto->SetExtension(serialization::HashJoinWorkOrder::hash_join_work_order_type,
                       serialization::HashJoinWorkOrder::HASH_OUTER_JOIN);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2c0ce6a3/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 721d735..7d7af59 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -533,13 +533,11 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
         return false;
       }
 
-      const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
       const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
       for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
         const attribute_id attr_id =
             proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i);
-        if (!build_relation.hasAttributeWithId(attr_id) ||
-            !probe_relation.hasAttributeWithId(attr_id)) {
+        if (!probe_relation.hasAttributeWithId(attr_id)) {
           return false;
         }
       }


[3/5] incubator-quickstep git commit: Removed an unused message type.

Posted by zu...@apache.org.
Removed an unused message type.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/61689962
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/61689962
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/61689962

Branch: refs/heads/dist-q-opt-exe-gen-test
Commit: 6168996216af8278d5c789c67aa4ec8325fab483
Parents: 2c0ce6a
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 15:32:34 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Aug 9 10:44:50 2016 -0700

----------------------------------------------------------------------
 query_execution/ForemanSingleNode.cpp        |  4 +---
 query_execution/PolicyEnforcerBase.cpp       | 13 -------------
 query_execution/QueryExecutionMessages.proto |  5 -----
 query_execution/QueryExecutionTypedefs.hpp   |  1 -
 query_execution/Shiftboss.cpp                |  1 -
 query_execution/Shiftboss.hpp                |  1 -
 query_execution/Worker.hpp                   |  1 -
 7 files changed, 1 insertion(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 23db379..d064a6f 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -87,7 +87,6 @@ ForemanSingleNode::ForemanSingleNode(
       kPoisonMessage,
       kRebuildWorkOrderCompleteMessage,
       kWorkOrderFeedbackMessage,
-      kWorkOrdersAvailableMessage,
       kWorkOrderCompleteMessage};
 
   for (const auto message_type : receiver_message_types) {
@@ -122,8 +121,7 @@ void ForemanSingleNode::run() {
       case kDataPipelineMessage:
       case kRebuildWorkOrderCompleteMessage:
       case kWorkOrderCompleteMessage:
-      case kWorkOrderFeedbackMessage:
-      case kWorkOrdersAvailableMessage: {
+      case kWorkOrderFeedbackMessage: {
         policy_enforcer_->processMessage(tagged_message);
         break;
       }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 78f7b44..4174bd6 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -107,19 +107,6 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
           op_index, proto.block_id(), proto.relation_id());
       break;
     }
-    case kWorkOrdersAvailableMessage: {
-      serialization::WorkOrdersAvailableMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-
-      // Check if new work orders are available.
-      admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
-      break;
-    }
     case kWorkOrderFeedbackMessage: {
       WorkOrder::FeedbackMessage msg(
           const_cast<void *>(tagged_message.message()),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 20b684e..060efa1 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -74,11 +74,6 @@ message DataPipelineMessage {
   required uint64 query_id = 4;
 }
 
-message WorkOrdersAvailableMessage {
-  required uint64 operator_index = 1;
-  required uint64 query_id = 2;
-}
-
 // Distributed version related messages.
 message ShiftbossRegistrationMessage {
   // The total Work Order processing capacity in Shiftboss, which equals to the

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index d154d84..33a93b0 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -69,7 +69,6 @@ enum QueryExecutionMessageType : message_type_id {
   kWorkOrderCompleteMessage,  // From Worker to Foreman.
   kCatalogRelationNewBlockMessage,  // From InsertDestination to Foreman.
   kDataPipelineMessage,  // From InsertDestination or some WorkOrders to Foreman.
-  kWorkOrdersAvailableMessage,  // From some WorkOrders to Foreman.
   kWorkOrderFeedbackMessage,  // From some WorkOrders to Foreman on behalf of
                               // their corresponding RelationalOperators.
   kRebuildWorkOrderMessage,  // From Foreman to Worker.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 24c91fe..bd83dd4 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -152,7 +152,6 @@ void Shiftboss::run() {
       case kWorkOrderCompleteMessage:  // Fall through.
       case kRebuildWorkOrderCompleteMessage:
       case kDataPipelineMessage:
-      case kWorkOrdersAvailableMessage:
       case kWorkOrderFeedbackMessage: {
         LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
                   << "') forwarded typed '" << annotated_message.tagged_message.message_type()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 9464a4d..30a8d1a 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -103,7 +103,6 @@ class Shiftboss : public Thread {
     // Message sent to Foreman.
     bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrdersAvailableMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
 
     // Forward the following message types from Foreman to Workers.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index 44a7447..aa39bb3 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -68,7 +68,6 @@ class Worker : public Thread {
                                  kRebuildWorkOrderCompleteMessage);
     bus_->RegisterClientAsSender(worker_client_id_, kCatalogRelationNewBlockMessage);
     bus_->RegisterClientAsSender(worker_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(worker_client_id_, kWorkOrdersAvailableMessage);
     bus_->RegisterClientAsSender(worker_client_id_, kWorkOrderFeedbackMessage);
 
     bus_->RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);