You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/12/05 04:58:05 UTC

[1/5] incubator-quickstep git commit: Logged only in the debug mode. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/dist-executable 8ea875d04 -> c128191cc (forced update)


Logged only in the debug mode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5ff89dd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5ff89dd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5ff89dd5

Branch: refs/heads/dist-executable
Commit: 5ff89dd5427bf904e1406111bc2877836f586196
Parents: c608e99
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Nov 30 23:07:25 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Nov 30 23:07:25 2016 -0800

----------------------------------------------------------------------
 storage/StorageManager.cpp | 26 +++++++++++++-------------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ff89dd5/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 56ca323..a202f71 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -557,9 +557,9 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
                         kGetPeerDomainNetworkAddressesMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
-            << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
-            << "') to BlockLocator";
+  DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+             << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
+             << "') to BlockLocator";
 
   DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
   DCHECK(bus_ != nullptr);
@@ -573,8 +573,8 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
   const TaggedMessage &tagged_message = annotated_message.tagged_message;
   CHECK_EQ(block_locator_client_id_, annotated_message.sender);
   CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
-  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
-            << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
+  DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+             << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
 
   serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
   CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -591,10 +591,10 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
                                               const tmb::message_type_id message_type) {
   switch (message_type) {
     case kAddBlockLocationMessage:
-      LOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+      DLOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
       break;
     case kDeleteBlockLocationMessage:
-      LOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+      DLOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
       break;
     default:
       LOG(FATAL) << "Unknown message type " << message_type;
@@ -613,9 +613,9 @@ void StorageManager::sendBlockLocationMessage(const block_id block,
                         message_type);
   free(proto_bytes);
 
-  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
-            << "') sent BlockLocationMessage (typed '" << message_type
-            << "') to BlockLocator";
+  DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+             << "') sent BlockLocationMessage (typed '" << message_type
+             << "') to BlockLocator";
   CHECK(MessageBus::SendStatus::kOK ==
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          storage_manager_client_id_,
@@ -635,7 +635,7 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   // TODO(quickstep-team): Use a cost model to determine whether to load from
   // a remote peer or the disk.
   if (BlockIdUtil::Domain(block) != block_domain_) {
-    LOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+    DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
     const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
     for (const string &peer_domain_network_address : peer_domain_network_addresses) {
       DataExchangerClientAsync client(
@@ -648,8 +648,8 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
       }
     }
 
-    LOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
-              << " from remote peers, so try to load from disk.";
+    DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+               << " from remote peers, so try to load from disk.";
   }
 #endif
 


[3/5] incubator-quickstep git commit: Marked SingleNodeQuery for Insertions.

Posted by zu...@apache.org.
Marked SingleNodeQuery for Insertions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0859a17a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0859a17a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0859a17a

Branch: refs/heads/dist-executable
Commit: 0859a17aa4e71ef8d3d261f15e52518b39f617f6
Parents: e50a2b7
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 14:11:58 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 15:44:24 2016 -0800

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        | 16 +++++++---
 query_execution/PolicyEnforcerDistributed.hpp | 23 +++++++++++---
 query_optimizer/ExecutionGenerator.cpp        |  3 ++
 query_optimizer/QueryHandle.hpp               | 37 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 61f0603..0dad8b0 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -295,14 +295,22 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
   return true;
 }
 
+namespace {
+constexpr size_t kDefaultShiftbossIndex = 0u;
+}  // namespace
+
 void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
-  const size_t num_shiftbosses = shiftboss_directory_.size();
-  size_t shiftboss_index = 0u;
+  static size_t shiftboss_index = kDefaultShiftbossIndex;
+
+  PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get());
   for (const auto &message : messages) {
     DCHECK(message != nullptr);
     const S::WorkOrderMessage &proto = *message;
     size_t shiftboss_index_for_particular_work_order_type;
-    if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+    if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
+      // Always schedule the single-node query to the same Shiftboss.
+      shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
+    } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else {
       // TODO(zuyu): Take data-locality into account for scheduling.
@@ -313,7 +321,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
     shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
 
     if (shiftboss_index == shiftboss_index_for_particular_work_order_type) {
-      shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+      shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size();
     } else {
       // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
       // <shiftboss_index_for_particular_work_order_type> might be scheduled one

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index e8bc394..2c00a6b 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -17,14 +17,20 @@
 
 #include <cstddef>
 #include <memory>
+#include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
+
 #include "tmb/id_typedefs.h"
 
 namespace tmb {
@@ -35,10 +41,6 @@ class TaggedMessage;
 namespace quickstep {
 
 class CatalogDatabaseLite;
-class QueryHandle;
-class QueryManagerBase;
-
-namespace serialization { class WorkOrderMessage; }
 
 /** \addtogroup QueryExecution
  *  @{
@@ -90,6 +92,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
 
   /**
+   * @brief Whether the query should be executed on one Shiftboss.
+   *
+   * @param query_id The query id.
+   *
+   * @return Whether the query should be executed on one Shiftboss.
+   **/
+  bool isSingleNodeQuery(const std::size_t query_id) const {
+    const auto cit = admitted_queries_.find(query_id);
+    DCHECK(cit != admitted_queries_.end());
+    return cit->second->query_handle()->is_single_node_query();
+  }
+
+  /**
    * @brief Get or set the index of Shiftboss for an Aggregation related
    * WorkOrder. If it is the first Aggregation on <aggr_state_index>,
    * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2e0d8f3..5a2c450 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1096,6 +1096,9 @@ void ExecutionGenerator::convertDropTable(
 void ExecutionGenerator::convertInsertTuple(
     const P::InsertTuplePtr &physical_plan) {
   // InsertTuple is converted to an Insert and a SaveBlocks.
+#ifdef QUICKSTEP_DISTRIBUTED
+  query_handle_->set_is_single_node_query();
+#endif  // QUICKSTEP_DISTRIBUTED
 
   const CatalogRelationInfo *input_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->input());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 1ca6021..cbd1cd9 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -26,6 +26,7 @@
 
 #include "catalog/Catalog.pb.h"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "query_optimizer/QueryPlan.hpp"
 #include "utility/Macros.hpp"
 
@@ -134,6 +135,22 @@ class QueryHandle {
     query_result_relation_ = relation;
   }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Whether the query will be executed in the single node.
+   */
+  bool is_single_node_query() const {
+    return is_single_node_query_;
+  }
+
+  /**
+   * @brief Set the query to be executed in the single node.
+   */
+  void set_is_single_node_query() {
+    is_single_node_query_ = true;
+  }
+#endif  // QUICKSTEP_DISTRIBUTED
+
  private:
   const std::size_t query_id_;
 
@@ -153,6 +170,26 @@ class QueryHandle {
   //             and deleted by the Cli shell.
   const CatalogRelation *query_result_relation_;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  // Indicate whether the query should be executed on the default Shiftboss for
+  // correctness purpose.
+  // An example would be the insert query that might otherwise need block
+  // invalidation among multiple StorageManagers. In this case, an insert query
+  // has scheduled on node 0, and the block is in the buffer pool of node 0.
+  // Another insert query on the same relation might be scheduled on another
+  // node, say node 1, which will pull the block from node 0, and do the
+  // insertion. Thus, two blocks with the same block id in two nodes
+  // have different contents, which is incorrect.
+  // One approach is to evict blocks cached in all other nodes for every
+  // change. It, however, does not scale, and even worse, it will also affect
+  // the performance of each select query.
+  // Alternatively, we choose to mark the query as a single-node query to
+  // modify blocks on the default node only. But if the changed block has also
+  // cached in another node, this approach would still produce inconsistent
+  // query result.
+  bool is_single_node_query_ = false;
+#endif  // QUICKSTEP_DISTRIBUTED
+
   DISALLOW_COPY_AND_ASSIGN(QueryHandle);
 };
 


[4/5] incubator-quickstep git commit: Scheduling based on data locality info.

Posted by zu...@apache.org.
Scheduling based on data locality info.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/137daf9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/137daf9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/137daf9a

Branch: refs/heads/dist-executable
Commit: 137daf9aa02e3fe6d9585828bbab2face3c393ad
Parents: 0859a17
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 20:41:10 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 20:41:10 2016 -0800

----------------------------------------------------------------------
 query_execution/BlockLocator.cpp                |  7 +++
 query_execution/BlockLocator.hpp                | 33 +++++++++-
 query_execution/CMakeLists.txt                  |  2 +
 query_execution/ForemanDistributed.cpp          | 65 +++++++++++++++++++-
 query_execution/ForemanDistributed.hpp          | 27 +++++++-
 query_execution/QueryExecutionMessages.proto    |  6 ++
 query_execution/QueryExecutionTypedefs.hpp      |  1 +
 query_execution/Shiftboss.cpp                   |  1 +
 .../DistributedExecutionGeneratorTestRunner.cpp |  4 +-
 storage/StorageManager.cpp                      | 29 +++++++++
 storage/StorageManager.hpp                      |  9 +++
 11 files changed, 178 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 5de6a54..e0fc47a 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -65,6 +65,13 @@ void BlockLocator::run() {
         processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
         break;
       }
+      case kBlockDomainToShiftbossIndexMessage: {
+        serialization::BlockDomainToShiftbossIndexMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        block_domain_to_shiftboss_index_.emplace(proto.block_domain(), proto.shiftboss_index());
+        break;
+      }
       case kAddBlockLocationMessage: {
         serialization::BlockLocationMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index a83a394..d5753a8 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
 
 #include <atomic>
+#include <cstddef>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -48,6 +49,9 @@ namespace quickstep {
  **/
 class BlockLocator : public Thread {
  public:
+  typedef std::unordered_map<block_id_domain, std::size_t> BlockDomainToShiftbossIndex;
+  typedef std::unordered_map<block_id, std::unordered_set<block_id_domain>> BlockLocation;
+
   /**
    * @brief Constructor.
    *
@@ -67,6 +71,8 @@ class BlockLocator : public Thread {
     bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
     bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
 
+    bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainToShiftbossIndexMessage);
+
     bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
     bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
 
@@ -91,6 +97,24 @@ class BlockLocator : public Thread {
     return locator_client_id_;
   }
 
+  /**
+   * @brief Get the mapping from a block domain to its Shiftboss index.
+   *
+   * @return The mapping from a block domain to its Shiftboss index.
+   **/
+  const BlockDomainToShiftbossIndex& block_domain_to_shiftboss_index() const {
+    return block_domain_to_shiftboss_index_;
+  }
+
+  /**
+   * @brief Get the mapping from a block id to its loaded block domain.
+   *
+   * @return The mapping from a block id to its loaded block domain.
+   **/
+  const BlockLocation& block_locations() const {
+    return block_locations_;
+  }
+
  protected:
   void run() override;
 
@@ -110,8 +134,15 @@ class BlockLocator : public Thread {
   // "0.0.0.0:0".
   std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
 
+  // From a block domain to its Shiftboss index, used by ForemanDistributed
+  // to schedule based on the data-locality info.
+  // Note that not every 'block_id_domain' has a Shiftboss index. For example,
+  // DistributedCli has StorageManager with a 'block_id_domain', which is not
+  // a part of Shiftboss.
+  BlockDomainToShiftbossIndex block_domain_to_shiftboss_index_;
+
   // From a block to its domains.
-  std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+  BlockLocation block_locations_;
 
   // From a block domain to all blocks loaded in its buffer pool.
   std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 1f7add8..4181f4a 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -105,6 +105,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_catalog_CatalogTypedefs
                         quickstep_catalog_Catalog_proto
                         quickstep_queryexecution_AdmitRequestMessage
+                        quickstep_queryexecution_BlockLocator
                         quickstep_queryexecution_ForemanBase
                         quickstep_queryexecution_PolicyEnforcerBase
                         quickstep_queryexecution_PolicyEnforcerDistributed
@@ -114,6 +115,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_relationaloperators_WorkOrder_proto
+                        quickstep_storage_StorageBlockInfo
                         quickstep_threading_ThreadUtil
                         quickstep_utility_EqualsAnyConstant
                         quickstep_utility_Macros

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 0dad8b0..db82908 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/BlockLocator.hpp"
 #include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/PolicyEnforcerDistributed.hpp"
 #include "query_execution/QueryContext.hpp"
@@ -36,6 +37,7 @@
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
 #include "threading/ThreadUtil.hpp"
 #include "utility/EqualsAnyConstant.hpp"
 
@@ -64,10 +66,14 @@ namespace S = serialization;
 class QueryHandle;
 
 ForemanDistributed::ForemanDistributed(
+    const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index,
+    const BlockLocator::BlockLocation &block_locations,
     MessageBus *bus,
     CatalogDatabaseLite *catalog_database,
     const int cpu_id)
     : ForemanBase(bus, cpu_id),
+      block_domain_to_shiftboss_index_(block_domain_to_shiftboss_index),
+      block_locations_(block_locations),
       catalog_database_(DCHECK_NOTNULL(catalog_database)) {
   const std::vector<QueryExecutionMessageType> sender_message_types{
       kShiftbossRegistrationResponseMessage,
@@ -295,6 +301,62 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
   return true;
 }
 
+bool ForemanDistributed::isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+                                                    std::size_t *shiftboss_index_for_join) {
+  if (work_order_proto.work_order_type() != S::NESTED_LOOP_JOIN) {
+    return false;
+  }
+
+  const block_id left_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id);
+  if (hasBlockLocalityInfoHelper(left_block, shiftboss_index_for_join)) {
+    return true;
+  }
+
+  const block_id right_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id);
+  return hasBlockLocalityInfoHelper(right_block, shiftboss_index_for_join);
+}
+
+bool ForemanDistributed::hasBlockLocalityInfo(const S::WorkOrder &work_order_proto,
+                                              size_t *shiftboss_index_for_block) {
+  block_id block = kInvalidBlockId;
+  switch (work_order_proto.work_order_type()) {
+    case S::SAVE_BLOCKS: {
+      block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id);
+      break;
+    }
+    case S::SELECT: {
+      block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
+      break;
+    }
+    case S::SORT_RUN_GENERATION: {
+      block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id);
+      break;
+    }
+    default:
+      return false;
+  }
+
+  DCHECK_NE(block, kInvalidBlockId);
+  return hasBlockLocalityInfoHelper(block, shiftboss_index_for_block);
+}
+
+bool ForemanDistributed::hasBlockLocalityInfoHelper(const block_id block,
+                                                    size_t *shiftboss_index_for_block) {
+  const auto cit = block_locations_.find(block);
+  if (cit != block_locations_.end()) {
+    for (const block_id_domain block_domain : cit->second) {
+      // TODO(quickstep-team): choose the best node, instead of the first.
+      const auto cit_shiftboss_index = block_domain_to_shiftboss_index_.find(block_domain);
+      if (cit_shiftboss_index != block_domain_to_shiftboss_index_.end()) {
+        *shiftboss_index_for_block = cit_shiftboss_index->second;
+        return true;
+      }
+    }
+  }
+
+  return false;
+}
+
 namespace {
 constexpr size_t kDefaultShiftbossIndex = 0u;
 }  // namespace
@@ -312,8 +374,9 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
       shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
     } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+    } else if (hasBlockLocalityInfo(proto.work_order(), &shiftboss_index_for_particular_work_order_type)) {
+    } else if (isNestedLoopsJoinWorkOrder(proto.work_order(), &shiftboss_index_for_particular_work_order_type)) {
     } else {
-      // TODO(zuyu): Take data-locality into account for scheduling.
       shiftboss_index_for_particular_work_order_type = shiftboss_index;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 34bac07..92d1e98 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -23,8 +23,10 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanBase.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
@@ -51,6 +53,9 @@ class ForemanDistributed final : public ForemanBase {
   /**
    * @brief Constructor.
    *
+   * @param block_domain_to_shiftboss_index A mapping from a block domain to
+   *        its Shiftboss index in BlockLocator.
+   * @param block_locations Block locality info in BlockLocator.
    * @param bus A pointer to the TMB.
    * @param catalog_database The catalog database where this query is executed.
    * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
@@ -58,9 +63,12 @@ class ForemanDistributed final : public ForemanBase {
    * @note If cpu_id is not specified, Foreman thread can be possibly moved
    *       around on different CPUs by the OS.
   **/
-  ForemanDistributed(tmb::MessageBus *bus,
-                     CatalogDatabaseLite *catalog_database,
-                     const int cpu_id = -1);
+  ForemanDistributed(
+      const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index,
+      const BlockLocator::BlockLocation &block_locations,
+      tmb::MessageBus *bus,
+      CatalogDatabaseLite *catalog_database,
+      const int cpu_id = -1);
 
   ~ForemanDistributed() override {}
 
@@ -79,6 +87,15 @@ class ForemanDistributed final : public ForemanBase {
                                   const std::size_t next_shiftboss_index_to_schedule,
                                   std::size_t *shiftboss_index_for_hash_join);
 
+  bool isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+                                  std::size_t *shiftboss_index_for_join);
+
+  bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
+                            std::size_t *shiftboss_index_for_block);
+
+  bool hasBlockLocalityInfoHelper(const block_id block,
+                                  std::size_t *shiftboss_index_for_block);
+
   /**
    * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
    *        worker threads.
@@ -111,6 +128,10 @@ class ForemanDistributed final : public ForemanBase {
    **/
   bool canCollectNewMessages(const tmb::message_type_id message_type);
 
+  // Block locality info for scheduling.
+  const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index_;
+  const BlockLocator::BlockLocation &block_locations_;
+
   ShiftbossDirectory shiftboss_directory_;
 
   CatalogDatabaseLite *catalog_database_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e6d741a..93e458c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -142,6 +142,12 @@ message BlockDomainMessage {
   required uint32 block_domain = 1;
 }
 
+// Used for the block locality based scheduling in ForemanDistributed.
+message BlockDomainToShiftbossIndexMessage {
+  required uint32 block_domain = 1;
+  required uint64 shiftboss_index = 2;
+}
+
 // Used when StorageManager loads or evicts a block or a blob from its buffer
 // pool.
 message BlockLocationMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fb9a9d6..919e45b 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -99,6 +99,7 @@ enum QueryExecutionMessageType : message_type_id {
   // with a unique block domain.
   kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.
   kBlockDomainRegistrationResponseMessage,  // From BlockLocator to Worker.
+  kBlockDomainToShiftbossIndexMessage,  // From StorageManager to BlockLocator.
   kAddBlockLocationMessage,  // From StorageManager to BlockLocator.
   kDeleteBlockLocationMessage,  // From StorageManager to BlockLocator.
   kLocateBlockMessage,  // From StorageManager to BlockLocator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ed4bade..2ed42d0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -286,6 +286,7 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
   CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
   shiftboss_index_ = proto.shiftboss_index();
+  storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_);
 
   // Forward this message to Workers regarding <shiftboss_index_>.
   QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 5100651..7a0e720 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -98,7 +98,9 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
   // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
   // could receive a registration message from the latter.
-  foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_->catalog_database());
+  foreman_ = make_unique<ForemanDistributed>(block_locator_->block_domain_to_shiftboss_index(),
+                                             block_locator_->block_locations(),
+                                             &bus_, test_database_loader_->catalog_database());
 
   // We don't use the NUMA aware version of worker code.
   const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index a202f71..6299cda 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -230,6 +230,8 @@ StorageManager::StorageManager(
     bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
     bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
 
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainToShiftbossIndexMessage);
+
     bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
     bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
     bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
@@ -470,6 +472,33 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
 }
 
 #ifdef QUICKSTEP_DISTRIBUTED
+void StorageManager::sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index) {
+  serialization::BlockDomainToShiftbossIndexMessage proto;
+  proto.set_block_domain(block_domain_);
+  proto.set_shiftboss_index(shiftboss_index);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kBlockDomainToShiftbossIndexMessage);
+  free(proto_bytes);
+
+  DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+             << "') sent BlockDomainToShiftbossIndexMessage (typed '" << kBlockDomainToShiftbossIndexMessage
+             << "') to BlockLocator";
+
+  DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+  DCHECK(bus_ != nullptr);
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         storage_manager_client_id_,
+                                         block_locator_client_id_,
+                                         move(message)));
+}
+
 void StorageManager::pullBlockOrBlob(const block_id block,
                                      PullResponse *response) const {
   SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/137daf9a/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 066953b..b61f10a 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -382,6 +382,15 @@ class StorageManager {
 
 #ifdef QUICKSTEP_DISTRIBUTED
   /**
+   * @brief Send BlockDomainToShiftbossIndexMessage to BlockLocator so that
+   *        ForemanDistributed could take advantages of block locality info
+   *        for a better scheduling policy.
+   *
+   * @param shiftboss_index The Shiftboss index.
+   **/
+  void sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index);
+
+  /**
    * @brief Pull a block or a blob. Used by DataExchangerAsync.
    *
    * @param block The id of the block or blob.


[5/5] incubator-quickstep git commit: Added DistributedCli executable.

Posted by zu...@apache.org.
Added DistributedCli executable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c128191c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c128191c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c128191c

Branch: refs/heads/dist-executable
Commit: c128191cc7b8279dcfa18fd98d6cae12364ef44c
Parents: 137daf9
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 27 22:32:24 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 20:43:15 2016 -0800

----------------------------------------------------------------------
 CMakeLists.txt                               |  17 ++
 cli/CMakeLists.txt                           |   3 +
 cli/distributed/CMakeLists.txt               |  93 +++++++++
 cli/distributed/Cli.cpp                      | 239 ++++++++++++++++++++++
 cli/distributed/Cli.hpp                      |  71 +++++++
 cli/distributed/CliDistributedModule.hpp     |  23 +++
 cli/distributed/Conductor.cpp                | 182 ++++++++++++++++
 cli/distributed/Conductor.hpp                |  80 ++++++++
 cli/distributed/Executor.cpp                 |  87 ++++++++
 cli/distributed/Executor.hpp                 |  83 ++++++++
 cli/distributed/QuickstepDistributedCli.cpp  |  81 ++++++++
 cli/distributed/Role.cpp                     |  51 +++++
 cli/distributed/Role.hpp                     |  69 +++++++
 query_execution/QueryExecutionMessages.proto |   8 +
 query_execution/QueryExecutionTypedefs.hpp   |   6 +-
 validate_cmakelists.py                       |   4 +-
 16 files changed, 1095 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 126b47b..391cb26 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -782,3 +782,20 @@ endif()
 
 # Link against other required system and third-party libraries.
 target_link_libraries(quickstep_cli_shell ${LIBS})
+
+if (ENABLE_DISTRIBUTED)
+  # Build the quickstep_distributed_cli_shell executable.
+  add_executable (quickstep_distributed_cli_shell cli/distributed/QuickstepDistributedCli.cpp)
+  # Link against direct deps (will transitively pull in everything needed).
+  # NOTE(zuyu): Link quickstep_cli_LineReader on behalf of quickstep_cli_distributed_Cli,
+  # as a workaround for bypassing conditionally built target checks in validate_cmakelists.py.
+  target_link_libraries(quickstep_distributed_cli_shell
+                        glog
+                        quickstep_cli_LineReader
+                        quickstep_cli_distributed_Cli
+                        quickstep_cli_distributed_Conductor
+                        quickstep_cli_distributed_Executor
+                        quickstep_utility_StringUtil
+                        ${GFLAGS_LIB_NAME}
+                        ${GRPCPLUSPLUS_LIBRARIES})
+endif(ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 9b62af9..be13c82 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -16,6 +16,9 @@
 # under the License.
 
 include_directories(${CMAKE_CURRENT_BINARY_DIR})
+if (ENABLE_DISTRIBUTED)
+  add_subdirectory(distributed)
+endif(ENABLE_DISTRIBUTED)
 add_subdirectory(tests)
 
 if (WIN32)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
new file mode 100644
index 0000000..e16d8af
--- /dev/null
+++ b/cli/distributed/CMakeLists.txt
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if (BUILD_SHARED_LIBS)
+  set(GFLAGS_LIB_NAME gflags_nothreads-shared)
+else()
+  set(GFLAGS_LIB_NAME gflags_nothreads-static)
+endif()
+
+# Declare micro-libs and link dependencies:
+add_library(quickstep_cli_distributed_Cli Cli.cpp Cli.hpp)
+add_library(quickstep_cli_distributed_Conductor Conductor.cpp Conductor.hpp)
+add_library(quickstep_cli_distributed_Executor Executor.cpp Executor.hpp)
+add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_cli_distributed_Cli
+                      glog
+                      quickstep_catalog_CatalogRelation
+                      quickstep_cli_Flags
+                      quickstep_cli_PrintToScreen
+                      quickstep_cli_distributed_Role
+                      quickstep_parser_ParseStatement
+                      quickstep_parser_SqlParserWrapper
+                      quickstep_queryexecution_BlockLocatorUtil
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryExecutionUtil
+                      quickstep_storage_DataExchangerAsync
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil
+                      tmb)
+target_link_libraries(quickstep_cli_distributed_Conductor
+                      glog
+                      quickstep_cli_DefaultsConfigurator
+                      quickstep_cli_Flags
+                      quickstep_cli_distributed_Role
+                      quickstep_parser_ParseStatement
+                      quickstep_parser_SqlParserWrapper
+                      quickstep_queryexecution_BlockLocator
+                      quickstep_queryexecution_ForemanDistributed
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryExecutionUtil
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_queryoptimizer_QueryProcessor
+                      quickstep_storage_StorageConstants
+                      quickstep_utility_Macros
+                      quickstep_utility_SqlError
+                      tmb)
+target_link_libraries(quickstep_cli_distributed_Executor
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_cli_Flags
+                      quickstep_cli_distributed_Role
+                      quickstep_queryexecution_BlockLocatorUtil
+                      quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_Shiftboss
+                      quickstep_queryexecution_Worker
+                      quickstep_queryexecution_WorkerDirectory
+                      quickstep_storage_DataExchangerAsync
+                      quickstep_storage_StorageManager
+                      quickstep_utility_Macros
+                      tmb)
+target_link_libraries(quickstep_cli_distributed_Role
+                      quickstep_utility_Macros
+                      tmb
+                      ${GFLAGS_LIB_NAME})
+
+# Module all-in-one library:
+add_library(quickstep_cli_distributed ../../empty_src.cpp CliDistributedModule.hpp)
+
+target_link_libraries(quickstep_cli_distributed
+                      quickstep_cli_distributed_Cli
+                      quickstep_cli_distributed_Conductor
+                      quickstep_cli_distributed_Executor
+                      quickstep_cli_distributed_Role)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
new file mode 100644
index 0000000..01f824d
--- /dev/null
+++ b/cli/distributed/Cli.cpp
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Cli.hpp"
+
+#include <chrono>
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "cli/CliConfig.h"  // For QUICKSTEP_USE_LINENOISE.
+#include "cli/Flags.hpp"
+
+#ifdef QUICKSTEP_USE_LINENOISE
+#include "cli/LineReaderLineNoise.hpp"
+typedef quickstep::LineReaderLineNoise LineReaderImpl;
+#else
+#include "cli/LineReaderDumb.hpp"
+typedef quickstep::LineReaderDumb LineReaderImpl;
+#endif
+
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "utility/StringUtil.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+#include "glog/logging.h"
+
+using std::fprintf;
+using std::free;
+using std::make_unique;
+using std::malloc;
+using std::move;
+using std::printf;
+using std::size_t;
+using std::string;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = ::quickstep::serialization;
+
+void Cli::init() {
+  cli_id_ = bus_.Connect();
+  DLOG(INFO) << "DistributedCli TMB Client ID: " << cli_id_;
+
+  bus_.RegisterClientAsSender(cli_id_, kDistributedCliRegistrationMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kDistributedCliRegistrationResponseMessage);
+
+  DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage (typed '"
+             << kDistributedCliRegistrationMessage
+             << "') to all";
+
+  tmb::Address all_addresses;
+  all_addresses.All(true);
+  // NOTE(zuyu): The singleton Conductor would need one copy of the message.
+  tmb::MessageStyle style;
+
+  TaggedMessage cli_reg_message(kDistributedCliRegistrationMessage);
+  DCHECK(tmb::MessageBus::SendStatus::kOK ==
+      bus_.Send(cli_id_, all_addresses, style, move(cli_reg_message)));
+
+  // Wait for Conductor to response.
+  const AnnotatedMessage cli_reg_response_message(bus_.Receive(cli_id_, 0, true));
+  DCHECK_EQ(kDistributedCliRegistrationResponseMessage,
+            cli_reg_response_message.tagged_message.message_type());
+  conductor_client_id_ = cli_reg_response_message.sender;
+
+  DLOG(INFO) << "DistributedCli received typed '" << kDistributedCliRegistrationResponseMessage
+             << "' message from Conductor (id'" << conductor_client_id_ << "').";
+
+  // Setup StorageManager.
+  bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+  client_id locator_client_id;
+  storage_manager_ = make_unique<StorageManager>(
+      FLAGS_storage_path,
+      block_locator::getBlockDomain(data_exchanger_.network_address(), cli_id_, &locator_client_id, &bus_),
+      locator_client_id, &bus_);
+
+  data_exchanger_.set_storage_manager(storage_manager_.get());
+  data_exchanger_.start();
+
+  // Prepare for submitting a query.
+  bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
+}
+
+void Cli::run() {
+  LineReaderImpl line_reader("distributed_quickstep> ",
+                             "                  ...> ");
+  auto parser_wrapper = make_unique<SqlParserWrapper>();
+  std::chrono::time_point<std::chrono::steady_clock> start, end;
+
+  for (;;) {
+    string *command_string = new string();
+    *command_string = line_reader.getNextCommand();
+    if (command_string->size() == 0) {
+      delete command_string;
+      break;
+    }
+
+    parser_wrapper->feedNextBuffer(command_string);
+
+    bool quitting = false;
+    // A parse error should reset the parser. This is because the thrown quickstep
+    // SqlError does not do the proper reset work of the YYABORT macro.
+    bool reset_parser = false;
+    for (;;) {
+      ParseResult result = parser_wrapper->getNextStatement();
+      const ParseStatement &statement = *result.parsed_statement;
+      if (result.condition == ParseResult::kSuccess) {
+        if (statement.getStatementType() == ParseStatement::kQuit) {
+          quitting = true;
+          break;
+        }
+
+        CHECK_NE(statement.getStatementType(), ParseStatement::kCommand)
+            << "TODO(quickstep-team)";
+
+        DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+                   << "') to Conductor";
+        S::SqlQueryMessage proto;
+        proto.set_sql_query(*command_string);
+
+        const size_t proto_length = proto.ByteSize();
+        char *proto_bytes = static_cast<char*>(malloc(proto_length));
+        CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+        TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
+                                        proto_length,
+                                        kSqlQueryMessage);
+        free(proto_bytes);
+
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           cli_id_,
+                                           conductor_client_id_,
+                                           move(sql_query_message));
+
+        start = std::chrono::steady_clock::now();
+
+        const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+        DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type()
+                   << "' message from client " << annotated_message.sender;
+        switch (tagged_message.message_type()) {
+          case kQueryExecutionSuccessMessage: {
+            end = std::chrono::steady_clock::now();
+
+            S::QueryExecutionSuccessMessage proto;
+            CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+            if (proto.has_result_relation()) {
+              CatalogRelation result_relation(proto.result_relation());
+
+              PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout);
+
+              const vector<block_id> blocks(result_relation.getBlocksSnapshot());
+              for (const block_id block : blocks) {
+                storage_manager_->deleteBlockOrBlobFile(block);
+              }
+            }
+
+            std::chrono::duration<double, std::milli> time_in_ms = end - start;
+            printf("Time: %s ms\n", DoubleToStringWithSignificantDigits(time_in_ms.count(), 3).c_str());
+            break;
+          }
+          case kQueryExecutionErrorMessage: {
+            S::QueryExecutionErrorMessage proto;
+            CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+            fprintf(stderr, "%s", proto.error_message().c_str());
+            break;
+          }
+          default: {
+            LOG(ERROR) << "Unknown TMB message type";
+          }
+        }
+      } else {
+        if (result.condition == ParseResult::kError) {
+          fprintf(stderr, "%s", result.error_message.c_str());
+        }
+        reset_parser = true;
+        break;
+      }
+    }
+
+    if (quitting) {
+      break;
+    } else if (reset_parser) {
+      parser_wrapper = make_unique<SqlParserWrapper>();
+      reset_parser = false;
+    }
+  }
+
+  bus_.Disconnect(cli_id_);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Cli.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.hpp b/cli/distributed/Cli.hpp
new file mode 100644
index 0000000..32c178f
--- /dev/null
+++ b/cli/distributed/Cli.hpp
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
+
+#include <memory>
+
+#include "cli/distributed/Role.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ *  @{
+ */
+
+/**
+ * @brief A class for the Cli component in the distributed version.
+ **/
+class Cli final : public Role {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Cli() = default;
+
+  ~Cli() override {
+    data_exchanger_.shutdown();
+    storage_manager_.reset();
+    data_exchanger_.join();
+  }
+
+  void init() override;
+
+  void run() override;
+
+ private:
+  tmb::client_id cli_id_, conductor_client_id_;
+
+  DataExchangerAsync data_exchanger_;
+  std::unique_ptr<StorageManager> storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(Cli);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/CliDistributedModule.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/CliDistributedModule.hpp b/cli/distributed/CliDistributedModule.hpp
new file mode 100644
index 0000000..cfa1e1b
--- /dev/null
+++ b/cli/distributed/CliDistributedModule.hpp
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+/** @defgroup CliDistributed
+ *
+ * The distributed QuickStep command-line interface.
+ **/

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
new file mode 100644
index 0000000..95e5f4d
--- /dev/null
+++ b/cli/distributed/Conductor.cpp
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Conductor.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <exception>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "cli/DefaultsConfigurator.hpp"
+#include "cli/Flags.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "storage/StorageConstants.hpp"
+#include "utility/SqlError.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+#include "glog/logging.h"
+
+using std::free;
+using std::make_unique;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::string;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = ::quickstep::serialization;
+
+void Conductor::init() {
+  try {
+    string catalog_path = FLAGS_storage_path + kCatalogFilename;
+
+    if (quickstep::FLAGS_initialize_db) {  // Initialize the database
+      DefaultsConfigurator::InitializeDefaultDatabase(FLAGS_storage_path, catalog_path);
+    }
+
+    query_processor_ = make_unique<QueryProcessor>(move(catalog_path));
+  } catch (const std::exception &e) {
+    LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what()
+               << "\nIf you intended to create a new database, "
+               << "please use the \"-initialize_db=true\" command line option.";
+  } catch (...) {
+    LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
+  }
+
+  bus_.ResetBus();
+
+  conductor_client_id_ = bus_.Connect();
+  DLOG(INFO) << "Conductor TMB Client ID: " << conductor_client_id_;
+
+  bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
+
+  bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
+
+  block_locator_ = make_unique<BlockLocator>(&bus_);
+  block_locator_->start();
+
+  foreman_ = make_unique<ForemanDistributed>(block_locator_->block_domain_to_shiftboss_index(),
+                                             block_locator_->block_locations(),
+                                             &bus_, query_processor_->getDefaultDatabase());
+  foreman_->start();
+}
+
+void Conductor::run() {
+  for (;;) {
+    AnnotatedMessage annotated_message(bus_.Receive(conductor_client_id_, 0, true));
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    const client_id sender = annotated_message.sender;
+
+    DLOG(INFO) << "Conductor received typed '" << tagged_message.message_type()
+               << "' message from client " << sender;
+    switch (tagged_message.message_type()) {
+      case kDistributedCliRegistrationMessage: {
+        TaggedMessage message(kDistributedCliRegistrationResponseMessage);
+
+        DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '"
+                   << kDistributedCliRegistrationResponseMessage
+                   << "') to Distributed CLI " << sender;
+        CHECK(MessageBus::SendStatus::kOK ==
+            QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+        break;
+      }
+      case kSqlQueryMessage: {
+        S::SqlQueryMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+        DLOG(INFO) << "Conductor received the following SQL query: " << proto.sql_query();
+
+        processSqlQueryMessage(sender, new string(move(proto.sql_query())));
+        break;
+      }
+      default:
+        LOG(FATAL) << "Unknown TMB message type";
+    }
+  }
+}
+
+void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
+  parser_wrapper_.feedNextBuffer(command_string);
+  ParseResult parse_result = parser_wrapper_.getNextStatement();
+
+  CHECK(parse_result.condition == ParseResult::kSuccess)
+      << "Any SQL syntax error should be addressed in the DistributedCli.";
+
+  const ParseStatement &statement = *parse_result.parsed_statement;
+  CHECK(statement.getStatementType() != ParseStatement::kCommand)
+     << "TODO(quickstep-team)";
+
+  try {
+    auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
+                                                 sender,
+                                                 statement.getPriority());
+    query_processor_->generateQueryHandle(statement, query_handle.get());
+    DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+    QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+        conductor_client_id_,
+        foreman_->getBusClientID(),
+        query_handle.release(),
+        &bus_);
+  } catch (const SqlError &sql_error) {
+    // Set the query execution status along with the error message.
+    S::QueryExecutionErrorMessage proto;
+    proto.set_error_message(sql_error.formatMessage(*command_string));
+
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kQueryExecutionErrorMessage);
+    free(proto_bytes);
+
+    DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '"
+               << kQueryExecutionErrorMessage
+               << "') to Distributed CLI " << sender;
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
new file mode 100644
index 0000000..e8c9582
--- /dev/null
+++ b/cli/distributed/Conductor.hpp
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
+
+#include <memory>
+#include <string>
+
+#include "cli/distributed/Role.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ *  @{
+ */
+
+/**
+ * @brief A class for the Conductor component in the distributed version.
+ **/
+class Conductor final : public Role {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Conductor() = default;
+
+  ~Conductor() override {
+    foreman_->join();
+    block_locator_->join();
+  }
+
+  void init() override;
+
+  void run() override;
+
+ private:
+  void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
+
+  SqlParserWrapper parser_wrapper_;
+
+  std::unique_ptr<QueryProcessor> query_processor_;
+
+  tmb::client_id conductor_client_id_;
+
+  std::unique_ptr<BlockLocator> block_locator_;
+
+  std::unique_ptr<ForemanDistributed> foreman_;
+
+  DISALLOW_COPY_AND_ASSIGN(Conductor);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
new file mode 100644
index 0000000..1d03579
--- /dev/null
+++ b/cli/distributed/Executor.cpp
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Executor.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/Flags.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/native_net_client_message_bus.h"
+
+#include "glog/logging.h"
+
+using std::make_unique;
+using std::size_t;
+using std::vector;
+
+using tmb::client_id;
+
+namespace quickstep {
+
+void Executor::init() {
+  executor_client_id_ = bus_.Connect();
+  DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_;
+
+  bus_.RegisterClientAsSender(executor_client_id_, kBlockDomainRegistrationMessage);
+  bus_.RegisterClientAsReceiver(executor_client_id_, kBlockDomainRegistrationResponseMessage);
+
+  vector<client_id> worker_client_ids;
+  vector<numa_node_id> worker_numa_nodes(FLAGS_num_workers, kAnyNUMANodeID);
+
+  for (std::size_t worker_thread_index = 0;
+       worker_thread_index < FLAGS_num_workers;
+       ++worker_thread_index) {
+    workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_));
+    worker_client_ids.push_back(workers_.back()->getBusClientID());
+  }
+
+  worker_directory_ =
+      make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, worker_numa_nodes);
+
+  client_id locator_client_id;
+  storage_manager_ = make_unique<StorageManager>(
+      FLAGS_storage_path,
+      block_locator::getBlockDomain(data_exchanger_.network_address(), executor_client_id_, &locator_client_id, &bus_),
+      locator_client_id, &bus_);
+
+  data_exchanger_.set_storage_manager(storage_manager_.get());
+  data_exchanger_.start();
+
+  shiftboss_ =
+      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+  shiftboss_->start();
+
+  for (const auto &worker : workers_) {
+    worker->start();
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Executor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp
new file mode 100644
index 0000000..6ffa756
--- /dev/null
+++ b/cli/distributed/Executor.hpp
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
+
+#include <memory>
+#include <vector>
+
+#include "cli/distributed/Role.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ *  @{
+ */
+
+/**
+ * @brief A class for the Executor component in the distributed version.
+ **/
+class Executor final : public Role {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Executor() = default;
+
+  ~Executor() override {
+    for (const auto &worker : workers_) {
+      worker->join();
+    }
+    shiftboss_->join();
+
+    data_exchanger_.shutdown();
+    storage_manager_.reset();
+    data_exchanger_.join();
+  }
+
+  void init() override;
+
+  void run() override {}
+
+ private:
+  tmb::client_id executor_client_id_;
+
+  std::vector<std::unique_ptr<Worker>> workers_;
+  std::unique_ptr<WorkerDirectory> worker_directory_;
+  DataExchangerAsync data_exchanger_;
+  std::unique_ptr<StorageManager> storage_manager_;
+  std::unique_ptr<Shiftboss> shiftboss_;
+
+  DISALLOW_COPY_AND_ASSIGN(Executor);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/QuickstepDistributedCli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/QuickstepDistributedCli.cpp b/cli/distributed/QuickstepDistributedCli.cpp
new file mode 100644
index 0000000..f01cd13
--- /dev/null
+++ b/cli/distributed/QuickstepDistributedCli.cpp
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+/* A standalone command-line interface to Distributed QuickStep */
+
+#include <memory>
+#include <string>
+
+#include "cli/distributed/Cli.hpp"
+#include "cli/distributed/Conductor.hpp"
+#include "cli/distributed/Executor.hpp"
+#include "cli/distributed/Role.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+using std::make_unique;
+
+namespace quickstep {
+
+constexpr char kCliRole[] = "cli";
+constexpr char kConductorRole[] = "conductor";
+constexpr char kExecutorRole[] = "executor";
+
+DEFINE_string(role, "",
+              "The role in the distributed Quickstep: Conductor, Executor, or Cli.");
+static bool ValidateRole(const char *flagname,
+                         const std::string &value) {
+  if (value.empty()) {
+    return false;
+  }
+
+  FLAGS_role = ToLower(value);
+  return FLAGS_role == kCliRole ||
+         FLAGS_role == kConductorRole ||
+         FLAGS_role == kExecutorRole;
+}
+static const volatile bool role_dummy
+    = gflags::RegisterFlagValidator(&FLAGS_role, &ValidateRole);
+
+}  // namespace quickstep
+
+using quickstep::FLAGS_role;
+
+int main(int argc, char *argv[]) {
+  google::InitGoogleLogging(argv[0]);
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  grpc_init();
+
+  std::unique_ptr<quickstep::Role> role;
+  if (FLAGS_role == quickstep::kCliRole) {
+    role = make_unique<quickstep::Cli>();
+  } else if (FLAGS_role == quickstep::kConductorRole) {
+    role = make_unique<quickstep::Conductor>();
+  } else if (FLAGS_role == quickstep::kExecutorRole) {
+    role = make_unique<quickstep::Executor>();
+  }
+
+  role->init();
+  role->run();
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Role.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Role.cpp b/cli/distributed/Role.cpp
new file mode 100644
index 0000000..d56ef09
--- /dev/null
+++ b/cli/distributed/Role.cpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Role.hpp"
+
+#include <cstdio>
+#include <cstdint>
+
+#include "gflags/gflags.h"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+                                 std::int32_t value) {
+  if (value > 0 && value < 65536) {
+    return true;
+  } else {
+    std::fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+    return false;
+  }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+    = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+Role::Role() {
+  bus_.AddServer(FLAGS_tmb_server_ip, FLAGS_tmb_server_port);
+  bus_.Initialize();
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/cli/distributed/Role.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Role.hpp b/cli/distributed/Role.hpp
new file mode 100644
index 0000000..b802543
--- /dev/null
+++ b/cli/distributed/Role.hpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
+
+#include "utility/Macros.hpp"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ *  @{
+ */
+
+/**
+ * @brief A base class for all components in the distributed version.
+ **/
+class Role {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Role();
+
+  /**
+   * @brief Virtual destructor.
+   **/
+  virtual ~Role() {}
+
+  /**
+   * @brief Initialize the component.
+   **/
+  virtual void init() = 0;
+
+  /**
+   * @brief Start the component.
+   **/
+  virtual void run() = 0;
+
+ protected:
+  tmb::NativeNetClientMessageBus bus_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Role);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 93e458c..28b5ebd 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -78,6 +78,10 @@ message ShiftbossRegistrationResponseMessage {
   required uint64 shiftboss_index = 1;
 }
 
+message SqlQueryMessage {
+  required string sql_query = 1;
+}
+
 message QueryInitiateMessage {
   required uint64 query_id = 1;
   required CatalogDatabase catalog_database_cache = 2;
@@ -131,6 +135,10 @@ message QueryExecutionSuccessMessage {
   optional CatalogRelationSchema result_relation = 1;
 }
 
+message QueryExecutionErrorMessage {
+  required string error_message = 1;
+}
+
 // BlockLocator related messages.
 message BlockDomainRegistrationMessage {
   // Format IP:Port, i.e., "0.0.0.0:0".

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 919e45b..faf2132 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,6 +81,9 @@ enum QueryExecutionMessageType : message_type_id {
   kShiftbossRegistrationMessage,  // From Shiftboss to Foreman.
   kShiftbossRegistrationResponseMessage,  // From Foreman to Shiftboss, or from
                                           // Shiftboss to Worker.
+  kDistributedCliRegistrationMessage,  // From CLI to Conductor.
+  kDistributedCliRegistrationResponseMessage,  // From Conductor to CLI.
+  kSqlQueryMessage,  // From CLI to Conductor.
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.
 
@@ -92,8 +95,9 @@ enum QueryExecutionMessageType : message_type_id {
   kSaveQueryResultMessage,  // From Foreman to Shiftboss.
   kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
 
-  // From Foreman to CLI.
+  // From Foreman / Conductor to CLI.
   kQueryExecutionSuccessMessage,
+  kQueryExecutionErrorMessage,
 
   // BlockLocator related messages, sorted in a life cycle of StorageManager
   // with a unique block domain.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c128191c/validate_cmakelists.py
----------------------------------------------------------------------
diff --git a/validate_cmakelists.py b/validate_cmakelists.py
index f691d1f..9d1f530 100755
--- a/validate_cmakelists.py
+++ b/validate_cmakelists.py
@@ -46,7 +46,9 @@ EXCLUDED_TOP_LEVEL_DIRS = ["build", "third_party"]
 # Explicitly ignored dependencies (special headers with no other quickstep
 # dependencies).
 IGNORED_DEPENDENCIES = frozenset(
-    ["quickstep_storage_DataExchange.grpc_proto",
+    ["quickstep_cli_LineReaderDumb",
+     "quickstep_cli_LineReaderLineNoise",
+     "quickstep_storage_DataExchange.grpc_proto",
      "quickstep_threading_WinThreadsAPI",
      "quickstep_utility_textbasedtest_TextBasedTest",
      "quickstep_utility_textbasedtest_TextBasedTestDriver",



[2/5] incubator-quickstep git commit: LOG only in the debug mode for BlockLocator.

Posted by zu...@apache.org.
LOG only in the debug mode for BlockLocator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e50a2b7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e50a2b7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e50a2b7a

Branch: refs/heads/dist-executable
Commit: e50a2b7aaed97f3282c338be0b36071a5cffd523
Parents: 5ff89dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Dec 3 21:24:25 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sat Dec 3 21:24:25 2016 -0800

----------------------------------------------------------------------
 query_execution/BlockLocator.cpp            | 48 ++++++++++++------------
 query_execution/QueryManagerDistributed.cpp |  4 +-
 2 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e50a2b7a/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 81684ba..5de6a54 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -55,8 +55,8 @@ void BlockLocator::run() {
     const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
     const TaggedMessage &tagged_message = annotated_message.tagged_message;
     const client_id sender = annotated_message.sender;
-    LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
-              << "' message from TMB Client " << sender;
+    DLOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
+               << "' message from TMB Client " << sender;
     switch (tagged_message.message_type()) {
       case kBlockDomainRegistrationMessage: {
         serialization::BlockDomainRegistrationMessage proto;
@@ -77,9 +77,9 @@ void BlockLocator::run() {
         DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
 
         if (result_domain_blocks.second) {
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
+          DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
         } else {
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
+          DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
         }
         break;
       }
@@ -95,9 +95,9 @@ void BlockLocator::run() {
           block_locations_[block].erase(domain);
           domain_blocks_[domain].erase(block);
 
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
+          DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
         } else {
-          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
+          DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
         }
         break;
       }
@@ -128,7 +128,7 @@ void BlockLocator::run() {
         }
         domain_blocks_.erase(domain);
 
-        LOG(INFO) << "Unregistered Domain " << domain;
+        DLOG(INFO) << "Unregistered Domain " << domain;
         break;
       }
       case kPoisonMessage: {
@@ -153,14 +153,14 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
   TaggedMessage message(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kBlockDomainRegistrationResponseMessage);
+                        proto_length,
+                        kBlockDomainRegistrationResponseMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
-            << "') sent BlockDomainRegistrationResponseMessage (typed '"
-            << kBlockDomainRegistrationResponseMessage
-            << "') to Worker (id '" << receiver << "')";
+  DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+             << "') sent BlockDomainRegistrationResponseMessage (typed '"
+             << kBlockDomainRegistrationResponseMessage
+             << "') to TMB Client (id '" << receiver << "')";
   CHECK(tmb::MessageBus::SendStatus::kOK ==
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          locator_client_id_,
@@ -181,13 +181,13 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver,
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
   TaggedMessage message(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kLocateBlockResponseMessage);
+                        proto_length,
+                        kLocateBlockResponseMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
-            << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
-            << "') to StorageManager (id '" << receiver << "')";
+  DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+             << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
+             << "') to StorageManager (id '" << receiver << "')";
   CHECK(tmb::MessageBus::SendStatus::kOK ==
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          locator_client_id_,
@@ -208,14 +208,14 @@ void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id r
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
   TaggedMessage message(static_cast<const void*>(proto_bytes),
-                                 proto_length,
-                                 kGetPeerDomainNetworkAddressesResponseMessage);
+                        proto_length,
+                        kGetPeerDomainNetworkAddressesResponseMessage);
   free(proto_bytes);
 
-  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
-            << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
-            << kGetPeerDomainNetworkAddressesResponseMessage
-            << "') to StorageManager (id '" << receiver << "')";
+  DLOG(INFO) << "BlockLocator (id '" << locator_client_id_
+             << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
+             << kGetPeerDomainNetworkAddressesResponseMessage
+             << "') to StorageManager (id '" << receiver << "')";
   CHECK(tmb::MessageBus::SendStatus::kOK ==
       QueryExecutionUtil::SendTMBMessage(bus_,
                                          locator_client_id_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e50a2b7a/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 20650d0..5c7e0d8 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -176,8 +176,8 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
     shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
   }
 
-  LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
-            << "') to all Shiftbosses";
+  DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+             << "') to all Shiftbosses";
   QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
                                        shiftboss_addresses,
                                        move(tagged_msg),