You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:34 UTC

[42/50] incubator-quickstep git commit: Scheduling based on data locality info.

Scheduling based on data locality info.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bc4086be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bc4086be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bc4086be

Branch: refs/heads/quickstep_partition_parser_support
Commit: bc4086be44bb400f29ae9cada17b951241040bee
Parents: 0859a17
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Dec 5 00:13:59 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Dec 5 00:13:59 2016 -0800

----------------------------------------------------------------------
 query_execution/BlockLocator.cpp                | 53 +++++++++++++++++---
 query_execution/BlockLocator.hpp                | 53 ++++++++++++++++++++
 query_execution/CMakeLists.txt                  |  3 ++
 query_execution/ForemanDistributed.cpp          | 53 +++++++++++++++++++-
 query_execution/ForemanDistributed.hpp          | 13 +++--
 query_execution/QueryExecutionMessages.proto    |  6 +++
 query_execution/QueryExecutionTypedefs.hpp      |  1 +
 query_execution/Shiftboss.cpp                   |  1 +
 .../DistributedExecutionGeneratorTestRunner.cpp |  2 +-
 storage/StorageManager.cpp                      | 29 +++++++++++
 storage/StorageManager.hpp                      |  9 ++++
 11 files changed, 210 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 5de6a54..fa6db51 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -27,6 +27,7 @@
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "threading/SpinSharedMutex.hpp"
 #include "threading/ThreadUtil.hpp"
 
 #include "glog/logging.h"
@@ -65,6 +66,18 @@ void BlockLocator::run() {
         processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
         break;
       }
+      case kBlockDomainToShiftbossIndexMessage: {
+        serialization::BlockDomainToShiftbossIndexMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        {
+          // Lock 'block_domain_to_shiftboss_index_shared_mutex_' as briefly as
+          // possible to insert an entry for the new Shiftboss index.
+          SpinSharedMutexExclusiveLock<false> write_lock(block_domain_to_shiftboss_index_shared_mutex_);
+          block_domain_to_shiftboss_index_.emplace(proto.block_domain(), proto.shiftboss_index());
+        }
+        break;
+      }
       case kAddBlockLocationMessage: {
         serialization::BlockLocationMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -72,9 +85,15 @@ void BlockLocator::run() {
         const block_id block = proto.block_id();
         const block_id_domain domain = proto.block_domain();
 
-        const auto result_block_locations = block_locations_[block].insert(domain);
         const auto result_domain_blocks = domain_blocks_[domain].insert(block);
-        DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
+
+        {
+          // Lock 'block_locations_shared_mutex_' as briefly as possible to
+          // insert an entry for the new block location.
+          SpinSharedMutexExclusiveLock<false> write_lock(block_locations_shared_mutex_);
+          const auto result_block_locations = block_locations_[block].insert(domain);
+          DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
+        }
 
         if (result_domain_blocks.second) {
           DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
@@ -90,11 +109,20 @@ void BlockLocator::run() {
         const block_id block = proto.block_id();
         const block_id_domain domain = proto.block_domain();
 
-        const auto cit = block_locations_[block].find(domain);
-        if (cit != block_locations_[block].end()) {
-          block_locations_[block].erase(domain);
-          domain_blocks_[domain].erase(block);
+        bool block_found = false;
+        {
+          // Lock 'block_locations_shared_mutex_' as briefly as possible to
+          // delete an entry for the block location.
+          SpinSharedMutexExclusiveLock<false> write_lock(block_locations_shared_mutex_);
+          const auto cit = block_locations_[block].find(domain);
+          if (cit != block_locations_[block].end()) {
+            block_locations_[block].erase(domain);
+            block_found = true;
+          }
+        }
 
+        if (block_found) {
+          domain_blocks_[domain].erase(block);
           DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
         } else {
           DLOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
@@ -123,8 +151,13 @@ void BlockLocator::run() {
 
         domain_network_addresses_.erase(domain);
 
-        for (const block_id block : domain_blocks_[domain]) {
-          block_locations_[block].erase(domain);
+        {
+          // Lock 'block_locations_shared_mutex_' as briefly as possible to
+          // delete all entry for the block domain.
+          SpinSharedMutexExclusiveLock<false> write_lock(block_locations_shared_mutex_);
+          for (const block_id block : domain_blocks_[domain]) {
+            block_locations_[block].erase(domain);
+          }
         }
         domain_blocks_.erase(domain);
 
@@ -172,6 +205,8 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver,
                                              const block_id block) {
   serialization::LocateBlockResponseMessage proto;
 
+  // NOTE(zuyu): We don't need to protect here, as all the writers are in the
+  // single thread.
   for (const block_id_domain domain : block_locations_[block]) {
     proto.add_block_domains(domain);
   }
@@ -199,6 +234,8 @@ void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id r
                                                                const block_id block) {
   serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
 
+  // NOTE(zuyu): We don't need to protect here, as all the writers are in the
+  // single thread.
   for (const block_id_domain domain : block_locations_[block]) {
     proto.add_domain_network_addresses(domain_network_addresses_[domain]);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index a83a394..4690369 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
 
 #include <atomic>
+#include <cstddef>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -28,6 +29,7 @@
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageConstants.hpp"
+#include "threading/SpinSharedMutex.hpp"
 #include "threading/Thread.hpp"
 #include "utility/Macros.hpp"
 
@@ -67,6 +69,8 @@ class BlockLocator : public Thread {
     bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
     bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
 
+    bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainToShiftbossIndexMessage);
+
     bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
     bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
 
@@ -91,6 +95,46 @@ class BlockLocator : public Thread {
     return locator_client_id_;
   }
 
+  /**
+   * @brief Get the block locality info for scheduling in ForemanDistributed.
+   *
+   * @param block The given block.
+   * @param shiftboss_index_for_block The index of Shiftboss that has loaded the
+   *        block in the buffer pool.
+   *
+   * @return Whether the block locality info has found.
+   **/
+  bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const {
+    std::unordered_set<block_id_domain> block_domains;
+    {
+      // Lock 'block_locations_shared_mutex_' as briefly as possible as a
+      // reader.
+      SpinSharedMutexSharedLock<false> read_lock(block_locations_shared_mutex_);
+      const auto cit = block_locations_.find(block);
+      if (cit != block_locations_.end()) {
+        block_domains = cit->second;
+      } else {
+        return false;
+      }
+    }
+
+    {
+      // NOTE(zuyu): This lock is held for the rest duration of this call, as the
+      // exclusive case is rare.
+      SpinSharedMutexSharedLock<false> read_lock(block_domain_to_shiftboss_index_shared_mutex_);
+      for (const block_id_domain block_domain : block_domains) {
+        // TODO(quickstep-team): choose the best node, instead of the first.
+        const auto cit = block_domain_to_shiftboss_index_.find(block_domain);
+        if (cit != block_domain_to_shiftboss_index_.end()) {
+          *shiftboss_index_for_block = cit->second;
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
  protected:
   void run() override;
 
@@ -110,8 +154,17 @@ class BlockLocator : public Thread {
   // "0.0.0.0:0".
   std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
 
+  // From a block domain to its Shiftboss index, used by ForemanDistributed
+  // to schedule based on the data-locality info.
+  // Note that not every 'block_id_domain' has a Shiftboss index. For example,
+  // DistributedCli has StorageManager with a 'block_id_domain', which is not
+  // a part of Shiftboss.
+  std::unordered_map<block_id_domain, std::size_t> block_domain_to_shiftboss_index_;
+  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_domain_to_shiftboss_index_shared_mutex_;
+
   // From a block to its domains.
   std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_locations_shared_mutex_;
 
   // From a block domain to all blocks loaded in its buffer pool.
   std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 1f7add8..0f74384 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -80,6 +80,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_storage_StorageBlockInfo
                         quickstep_storage_StorageConstants
+                        quickstep_threading_SpinSharedMutex
                         quickstep_threading_Thread
                         quickstep_threading_ThreadUtil
                         quickstep_utility_Macros
@@ -105,6 +106,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_catalog_CatalogTypedefs
                         quickstep_catalog_Catalog_proto
                         quickstep_queryexecution_AdmitRequestMessage
+                        quickstep_queryexecution_BlockLocator
                         quickstep_queryexecution_ForemanBase
                         quickstep_queryexecution_PolicyEnforcerBase
                         quickstep_queryexecution_PolicyEnforcerDistributed
@@ -114,6 +116,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_relationaloperators_WorkOrder_proto
+                        quickstep_storage_StorageBlockInfo
                         quickstep_threading_ThreadUtil
                         quickstep_utility_EqualsAnyConstant
                         quickstep_utility_Macros

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 0dad8b0..0fa701d 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/BlockLocator.hpp"
 #include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/PolicyEnforcerDistributed.hpp"
 #include "query_execution/QueryContext.hpp"
@@ -36,6 +37,7 @@
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
 #include "threading/ThreadUtil.hpp"
 #include "utility/EqualsAnyConstant.hpp"
 
@@ -64,10 +66,12 @@ namespace S = serialization;
 class QueryHandle;
 
 ForemanDistributed::ForemanDistributed(
+    const BlockLocator &block_locator,
     MessageBus *bus,
     CatalogDatabaseLite *catalog_database,
     const int cpu_id)
     : ForemanBase(bus, cpu_id),
+      block_locator_(block_locator),
       catalog_database_(DCHECK_NOTNULL(catalog_database)) {
   const std::vector<QueryExecutionMessageType> sender_message_types{
       kShiftbossRegistrationResponseMessage,
@@ -296,7 +300,50 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
 }
 
 namespace {
+
 constexpr size_t kDefaultShiftbossIndex = 0u;
+
+bool isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+                                const BlockLocator &block_locator,
+                                std::size_t *shiftboss_index_for_join) {
+  if (work_order_proto.work_order_type() != S::NESTED_LOOP_JOIN) {
+    return false;
+  }
+
+  const block_id left_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id);
+  if (block_locator.getBlockLocalityInfo(left_block, shiftboss_index_for_join)) {
+    return true;
+  }
+
+  const block_id right_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id);
+  return block_locator.getBlockLocalityInfo(right_block, shiftboss_index_for_join);
+}
+
+bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
+                          const BlockLocator &block_locator,
+                          std::size_t *shiftboss_index_for_block) {
+  block_id block = kInvalidBlockId;
+  switch (work_order_proto.work_order_type()) {
+    case S::SAVE_BLOCKS: {
+      block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id);
+      break;
+    }
+    case S::SELECT: {
+      block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
+      break;
+    }
+    case S::SORT_RUN_GENERATION: {
+      block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id);
+      break;
+    }
+    default:
+      return false;
+  }
+
+  DCHECK_NE(block, kInvalidBlockId);
+  return block_locator.getBlockLocalityInfo(block, shiftboss_index_for_block);
+}
+
 }  // namespace
 
 void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
@@ -306,14 +353,18 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
   for (const auto &message : messages) {
     DCHECK(message != nullptr);
     const S::WorkOrderMessage &proto = *message;
+    const S::WorkOrder &work_order_proto = proto.work_order();
     size_t shiftboss_index_for_particular_work_order_type;
     if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
       // Always schedule the single-node query to the same Shiftboss.
       shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
     } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+    } else if (hasBlockLocalityInfo(work_order_proto, block_locator_,
+                                    &shiftboss_index_for_particular_work_order_type)) {
+    } else if (isNestedLoopsJoinWorkOrder(work_order_proto, block_locator_,
+                                          &shiftboss_index_for_particular_work_order_type)) {
     } else {
-      // TODO(zuyu): Take data-locality into account for scheduling.
       shiftboss_index_for_particular_work_order_type = shiftboss_index;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 34bac07..ed09fda 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -33,6 +33,7 @@ namespace tmb { class MessageBus; }
 
 namespace quickstep {
 
+class BlockLocator;
 class CatalogDatabaseLite;
 
 namespace serialization { class WorkOrderMessage; }
@@ -51,6 +52,7 @@ class ForemanDistributed final : public ForemanBase {
   /**
    * @brief Constructor.
    *
+   * @param block_locator The block locator that manages block location info.
    * @param bus A pointer to the TMB.
    * @param catalog_database The catalog database where this query is executed.
    * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
@@ -58,9 +60,11 @@ class ForemanDistributed final : public ForemanBase {
    * @note If cpu_id is not specified, Foreman thread can be possibly moved
    *       around on different CPUs by the OS.
   **/
-  ForemanDistributed(tmb::MessageBus *bus,
-                     CatalogDatabaseLite *catalog_database,
-                     const int cpu_id = -1);
+  ForemanDistributed(
+      const BlockLocator &block_locator,
+      tmb::MessageBus *bus,
+      CatalogDatabaseLite *catalog_database,
+      const int cpu_id = -1);
 
   ~ForemanDistributed() override {}
 
@@ -111,6 +115,9 @@ class ForemanDistributed final : public ForemanBase {
    **/
   bool canCollectNewMessages(const tmb::message_type_id message_type);
 
+  // To get block locality info for scheduling.
+  const BlockLocator &block_locator_;
+
   ShiftbossDirectory shiftboss_directory_;
 
   CatalogDatabaseLite *catalog_database_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e6d741a..93e458c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -142,6 +142,12 @@ message BlockDomainMessage {
   required uint32 block_domain = 1;
 }
 
+// Used for the block locality based scheduling in ForemanDistributed.
+message BlockDomainToShiftbossIndexMessage {
+  required uint32 block_domain = 1;
+  required uint64 shiftboss_index = 2;
+}
+
 // Used when StorageManager loads or evicts a block or a blob from its buffer
 // pool.
 message BlockLocationMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fb9a9d6..919e45b 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -99,6 +99,7 @@ enum QueryExecutionMessageType : message_type_id {
   // with a unique block domain.
   kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.
   kBlockDomainRegistrationResponseMessage,  // From BlockLocator to Worker.
+  kBlockDomainToShiftbossIndexMessage,  // From StorageManager to BlockLocator.
   kAddBlockLocationMessage,  // From StorageManager to BlockLocator.
   kDeleteBlockLocationMessage,  // From StorageManager to BlockLocator.
   kLocateBlockMessage,  // From StorageManager to BlockLocator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ed4bade..2ed42d0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -286,6 +286,7 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
   CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
   shiftboss_index_ = proto.shiftboss_index();
+  storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_);
 
   // Forward this message to Workers regarding <shiftboss_index_>.
   QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 5100651..45d4fdf 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -98,7 +98,7 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
   // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
   // could receive a registration message from the latter.
-  foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_->catalog_database());
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database());
 
   // We don't use the NUMA aware version of worker code.
   const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index a202f71..6299cda 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -230,6 +230,8 @@ StorageManager::StorageManager(
     bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
     bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
 
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainToShiftbossIndexMessage);
+
     bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
     bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
     bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
@@ -470,6 +472,33 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
 }
 
 #ifdef QUICKSTEP_DISTRIBUTED
+void StorageManager::sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index) {
+  serialization::BlockDomainToShiftbossIndexMessage proto;
+  proto.set_block_domain(block_domain_);
+  proto.set_shiftboss_index(shiftboss_index);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kBlockDomainToShiftbossIndexMessage);
+  free(proto_bytes);
+
+  DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+             << "') sent BlockDomainToShiftbossIndexMessage (typed '" << kBlockDomainToShiftbossIndexMessage
+             << "') to BlockLocator";
+
+  DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+  DCHECK(bus_ != nullptr);
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         storage_manager_client_id_,
+                                         block_locator_client_id_,
+                                         move(message)));
+}
+
 void StorageManager::pullBlockOrBlob(const block_id block,
                                      PullResponse *response) const {
   SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc4086be/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 066953b..b61f10a 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -382,6 +382,15 @@ class StorageManager {
 
 #ifdef QUICKSTEP_DISTRIBUTED
   /**
+   * @brief Send BlockDomainToShiftbossIndexMessage to BlockLocator so that
+   *        ForemanDistributed could take advantages of block locality info
+   *        for a better scheduling policy.
+   *
+   * @param shiftboss_index The Shiftboss index.
+   **/
+  void sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index);
+
+  /**
    * @brief Pull a block or a blob. Used by DataExchangerAsync.
    *
    * @param block The id of the block or blob.