You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/17 00:15:21 UTC

[1/2] incubator-quickstep git commit: Removed unnecessary messages in BlockLocator. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/refactor-data-exchange 2305fcc99 -> 8c26c31fc (forced update)


Removed unnecessary messages in BlockLocator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/22bac39c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/22bac39c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/22bac39c

Branch: refs/heads/refactor-data-exchange
Commit: 22bac39c0c25b48ce34a3a8dc4a79c51716a4e75
Parents: 256f9dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 16 15:43:42 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 16 15:43:42 2017 -0700

----------------------------------------------------------------------
 query_execution/BlockLocator.cpp                | 35 --------------
 query_execution/BlockLocator.hpp                | 33 ++++++++------
 query_execution/QueryExecutionMessages.proto    |  4 --
 query_execution/QueryExecutionTypedefs.hpp      |  2 -
 query_execution/QueryExecutionUtil.hpp          |  2 -
 query_execution/tests/BlockLocator_unittest.cpp | 48 ++------------------
 6 files changed, 24 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 765021e..89c1c00 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -130,13 +130,6 @@ void BlockLocator::run() {
         }
         break;
       }
-      case kLocateBlockMessage: {
-        serialization::BlockMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processLocateBlockMessage(sender, proto.block_id());
-        break;
-      }
       case kGetPeerDomainNetworkAddressesMessage: {
         serialization::BlockMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -200,34 +193,6 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive
                                          move(message)));
 }
 
-void BlockLocator::processLocateBlockMessage(const client_id receiver,
-                                             const block_id block) {
-  serialization::LocateBlockResponseMessage proto;
-
-  // NOTE(zuyu): We don't need to protect here, as all the writers are in the
-  // single thread.
-  for (const block_id_domain domain : block_locations_[block]) {
-    proto.add_block_domains(domain);
-  }
-
-  const int proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kLocateBlockResponseMessage);
-  free(proto_bytes);
-
-  DLOG(INFO) << "BlockLocator with Client " << locator_client_id_
-             << " sent LocateBlockResponseMessage to StorageManager with Client " << receiver;
-  CHECK(tmb::MessageBus::SendStatus::kOK ==
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         locator_client_id_,
-                                         receiver,
-                                         move(message)));
-}
-
 void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
                                                                const block_id block) {
   serialization::GetPeerDomainNetworkAddressesResponseMessage proto;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index 4690369..f5572ca 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -74,9 +74,6 @@ class BlockLocator : public Thread {
     bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
     bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
 
-    bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage);
-    bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage);
-
     bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
     bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
 
@@ -96,29 +93,38 @@ class BlockLocator : public Thread {
   }
 
   /**
-   * @brief Get the block locality info for scheduling in ForemanDistributed.
+   * @brief Get the block domain info for a given block.
    *
    * @param block The given block.
-   * @param shiftboss_index_for_block The index of Shiftboss that has loaded the
-   *        block in the buffer pool.
    *
-   * @return Whether the block locality info has found.
+   * @return The block domain info for a given block.
    **/
-  bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const {
-    std::unordered_set<block_id_domain> block_domains;
+  std::unordered_set<block_id_domain> getBlockDomains(const block_id block) const {
     {
       // Lock 'block_locations_shared_mutex_' as briefly as possible as a
       // reader.
       SpinSharedMutexSharedLock<false> read_lock(block_locations_shared_mutex_);
       const auto cit = block_locations_.find(block);
       if (cit != block_locations_.end()) {
-        block_domains = cit->second;
-      } else {
-        return false;
+        return cit->second;
       }
     }
 
-    {
+    return std::unordered_set<block_id_domain>();
+  }
+
+  /**
+   * @brief Get the block locality info for scheduling in ForemanDistributed.
+   *
+   * @param block The given block.
+   * @param shiftboss_index_for_block The index of Shiftboss that has loaded the
+   *        block in the buffer pool.
+   *
+   * @return Whether the block locality info has found.
+   **/
+  bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const {
+    const std::unordered_set<block_id_domain> block_domains = getBlockDomains(block);
+    if (!block_domains.empty()) {
       // NOTE(zuyu): This lock is held for the rest duration of this call, as the
       // exclusive case is rare.
       SpinSharedMutexSharedLock<false> read_lock(block_domain_to_shiftboss_index_shared_mutex_);
@@ -140,7 +146,6 @@ class BlockLocator : public Thread {
 
  private:
   void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
-  void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
   void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
 
   tmb::MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e8f102a..c70b339 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -164,10 +164,6 @@ message BlockMessage {
   required fixed64 block_id = 1;
 }
 
-message LocateBlockResponseMessage {
-  repeated uint32 block_domains = 1;
-}
-
 message GetPeerDomainNetworkAddressesResponseMessage {
   repeated string domain_network_addresses = 1;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index afdac92..c56bcfd 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -115,8 +115,6 @@ enum QueryExecutionMessageType : message_type_id {
   kBlockDomainToShiftbossIndexMessage,  // From StorageManager to BlockLocator.
   kAddBlockLocationMessage,  // From StorageManager to BlockLocator.
   kDeleteBlockLocationMessage,  // From StorageManager to BlockLocator.
-  kLocateBlockMessage,  // From StorageManager to BlockLocator.
-  kLocateBlockResponseMessage,  // From BlockLocator to StorageManager.
   kGetPeerDomainNetworkAddressesMessage,  // From StorageManager to BlockLocator.
   kGetPeerDomainNetworkAddressesResponseMessage,  // From BlockLocator to StorageManager.
   kBlockDomainUnregistrationMessage,  // From StorageManager to BlockLocator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 3f74af3..1388426 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -83,8 +83,6 @@ class QueryExecutionUtil {
       case kBlockDomainToShiftbossIndexMessage:           return "BlockDomainToShiftbossIndexMessage";
       case kAddBlockLocationMessage:                      return "AddBlockLocationMessage";
       case kDeleteBlockLocationMessage:                   return "DeleteBlockLocationMessage";
-      case kLocateBlockMessage:                           return "LocateBlockMessage";
-      case kLocateBlockResponseMessage:                   return "LocateBlockResponseMessage";
       case kGetPeerDomainNetworkAddressesMessage:         return "GetPeerDomainNetworkAddressesMessage";
       case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage";
       case kBlockDomainUnregistrationMessage:             return "BlockDomainUnregistrationMessage";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index b73c2f7..1a7cf17 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -21,6 +21,7 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <unordered_set>
 #include <vector>
 
 #include "catalog/CatalogAttribute.hpp"
@@ -51,6 +52,7 @@ using std::malloc;
 using std::move;
 using std::string;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 
 using tmb::AnnotatedMessage;
@@ -79,9 +81,6 @@ class BlockLocatorTest : public ::testing::Test {
     bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
     bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
 
-    bus_.RegisterClientAsSender(worker_client_id_, kLocateBlockMessage);
-    bus_.RegisterClientAsReceiver(worker_client_id_, kLocateBlockResponseMessage);
-
     bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
 
     block_domain_ =
@@ -106,58 +105,21 @@ class BlockLocatorTest : public ::testing::Test {
                                            move(message)));
   }
 
-  vector<block_id_domain> getPeerDomains(const block_id block) {
-    serialization::BlockMessage proto;
-    proto.set_block_id(block);
-
-    const int proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kLocateBlockMessage);
-    free(proto_bytes);
-
-    LOG(INFO) << "Worker wth Client " << worker_client_id_ << " sent LocateBlockMessage to BlockLocator";
-    CHECK(MessageBus::SendStatus::kOK ==
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           worker_client_id_,
-                                           locator_client_id_,
-                                           move(message)));
-
-    const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
-    const TaggedMessage &tagged_message = annotated_message.tagged_message;
-    CHECK_EQ(kLocateBlockResponseMessage, tagged_message.message_type());
-    LOG(INFO) << "Worker with Client " << worker_client_id_
-              << " received LocateBlockResponseMessage from BlockLocator";
-
-    serialization::LocateBlockResponseMessage response_proto;
-    CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-    vector<block_id_domain> domains;
-    for (int i = 0; i < response_proto.block_domains_size(); ++i) {
-      domains.push_back(response_proto.block_domains(i));
-    }
-
-    return domains;
-  }
-
   void checkLoaded(const block_id block) {
     const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
     EXPECT_EQ(1u, peer_domain_network_addresses.size());
     EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data());
 
-    const vector<block_id_domain> domains = getPeerDomains(block);
+    const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block);
     EXPECT_EQ(1u, domains.size());
-    EXPECT_EQ(block_domain_, domains[0]);
+    EXPECT_EQ(1u, domains.count(block_domain_));
   }
 
   void checkEvicted(const block_id block) {
     const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
     EXPECT_TRUE(peer_domain_network_addresses.empty());
 
-    const vector<block_id_domain> domains = getPeerDomains(block);
+    const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block);
     EXPECT_TRUE(domains.empty());
   }
 


[2/2] incubator-quickstep git commit: Refactored the data exchange operation.

Posted by zu...@apache.org.
Refactored the data exchange operation.

  Unlike the original implementation needs two round messages, the redesigned
  block pull now needs only one round RPC, unless there is a RPC failure.
  The network address for peer StorageManager is cached locally, and will be
  updated once a block is created by a newly added peer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8c26c31f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8c26c31f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8c26c31f

Branch: refs/heads/refactor-data-exchange
Commit: 8c26c31fca8f0da156dccd710ad22044da22d4f4
Parents: 22bac39
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Mar 15 02:39:02 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 16 16:41:44 2017 -0700

----------------------------------------------------------------------
 query_execution/BlockLocator.cpp                |  29 ++--
 query_execution/BlockLocator.hpp                |   6 +-
 query_execution/QueryExecutionMessages.proto    |   4 +-
 query_execution/QueryExecutionTypedefs.hpp      |   4 +-
 query_execution/QueryExecutionUtil.hpp          |   4 +-
 query_execution/tests/BlockLocator_unittest.cpp |  25 ++--
 storage/StorageManager.cpp                      | 136 ++++++++++---------
 storage/StorageManager.hpp                      |  13 +-
 8 files changed, 114 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8c26c31f/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 89c1c00..1c8c690 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -130,11 +130,8 @@ void BlockLocator::run() {
         }
         break;
       }
-      case kGetPeerDomainNetworkAddressesMessage: {
-        serialization::BlockMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processGetPeerDomainNetworkAddressesMessage(sender, proto.block_id());
+      case kGetAllDomainNetworkAddressesMessage: {
+        processGetAllDomainNetworkAddressesMessage(sender);
         break;
       }
       case kBlockDomainUnregistrationMessage: {
@@ -193,32 +190,28 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive
                                          move(message)));
 }
 
-void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
-                                                               const block_id block) {
-  serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
+void BlockLocator::processGetAllDomainNetworkAddressesMessage(const client_id receiver) {
+  serialization::GetAllDomainNetworkAddressesResponseMessage proto;
 
   // NOTE(zuyu): We don't need to protect here, as all the writers are in the
   // single thread.
-  for (const block_id_domain domain : block_locations_[block]) {
-    proto.add_domain_network_addresses(domain_network_addresses_[domain]);
+  for (const auto &domain_network_address_pair : domain_network_addresses_) {
+    (*proto.mutable_domain_network_addresses())[domain_network_address_pair.first] =
+        domain_network_address_pair.second;
   }
 
   const int proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kGetPeerDomainNetworkAddressesResponseMessage);
+  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length,
+                        kGetAllDomainNetworkAddressesResponseMessage);
   free(proto_bytes);
 
   DLOG(INFO) << "BlockLocator with Client " << locator_client_id_
-             << " sent GetPeerDomainNetworkAddressesResponseMessage to StorageManager with Client " << receiver;
+             << " sent GetAllDomainNetworkAddressesResponseMessage to StorageManager with Client " << receiver;
   CHECK(tmb::MessageBus::SendStatus::kOK ==
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         locator_client_id_,
-                                         receiver,
-                                         move(message)));
+      QueryExecutionUtil::SendTMBMessage(bus_, locator_client_id_, receiver, move(message)));
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8c26c31f/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index f5572ca..82c28ae 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -74,8 +74,8 @@ class BlockLocator : public Thread {
     bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
     bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
 
-    bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
-    bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+    bus_->RegisterClientAsReceiver(locator_client_id_, kGetAllDomainNetworkAddressesMessage);
+    bus_->RegisterClientAsSender(locator_client_id_, kGetAllDomainNetworkAddressesResponseMessage);
 
     bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage);
     bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage);
@@ -146,7 +146,7 @@ class BlockLocator : public Thread {
 
  private:
   void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
-  void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
+  void processGetAllDomainNetworkAddressesMessage(const tmb::client_id receiver);
 
   tmb::MessageBus *bus_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8c26c31f/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index c70b339..9c59985 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -164,6 +164,6 @@ message BlockMessage {
   required fixed64 block_id = 1;
 }
 
-message GetPeerDomainNetworkAddressesResponseMessage {
-  repeated string domain_network_addresses = 1;
+message GetAllDomainNetworkAddressesResponseMessage {
+  map<uint32, string> domain_network_addresses = 1;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8c26c31f/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index c56bcfd..80da7c5 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -115,8 +115,8 @@ enum QueryExecutionMessageType : message_type_id {
   kBlockDomainToShiftbossIndexMessage,  // From StorageManager to BlockLocator.
   kAddBlockLocationMessage,  // From StorageManager to BlockLocator.
   kDeleteBlockLocationMessage,  // From StorageManager to BlockLocator.
-  kGetPeerDomainNetworkAddressesMessage,  // From StorageManager to BlockLocator.
-  kGetPeerDomainNetworkAddressesResponseMessage,  // From BlockLocator to StorageManager.
+  kGetAllDomainNetworkAddressesMessage,  // From StorageManager to BlockLocator.
+  kGetAllDomainNetworkAddressesResponseMessage,  // From BlockLocator to StorageManager.
   kBlockDomainUnregistrationMessage,  // From StorageManager to BlockLocator.
 #endif
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8c26c31f/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 1388426..e7aa512 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -83,8 +83,8 @@ class QueryExecutionUtil {
       case kBlockDomainToShiftbossIndexMessage:           return "BlockDomainToShiftbossIndexMessage";
       case kAddBlockLocationMessage:                      return "AddBlockLocationMessage";
       case kDeleteBlockLocationMessage:                   return "DeleteBlockLocationMessage";
-      case kGetPeerDomainNetworkAddressesMessage:         return "GetPeerDomainNetworkAddressesMessage";
-      case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage";
+      case kGetAllDomainNetworkAddressesMessage:          return "GetAllDomainNetworkAddressesMessage";
+      case kGetAllDomainNetworkAddressesResponseMessage:  return "GetAllDomainNetworkAddressesResponseMessage";
       case kBlockDomainUnregistrationMessage:             return "BlockDomainUnregistrationMessage";
 #endif  // QUICKSTEP_DISTRIBUTED
       default:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8c26c31f/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index 1a7cf17..1f0418a 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -106,21 +106,20 @@ class BlockLocatorTest : public ::testing::Test {
   }
 
   void checkLoaded(const block_id block) {
-    const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
-    EXPECT_EQ(1u, peer_domain_network_addresses.size());
-    EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data());
+    unordered_set<block_id_domain> domains;
+    do {
+      domains = locator_->getBlockDomains(block);
+    } while (domains.empty());
 
-    const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block);
     EXPECT_EQ(1u, domains.size());
     EXPECT_EQ(1u, domains.count(block_domain_));
   }
 
   void checkEvicted(const block_id block) {
-    const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
-    EXPECT_TRUE(peer_domain_network_addresses.empty());
-
-    const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block);
-    EXPECT_TRUE(domains.empty());
+    unordered_set<block_id_domain> domains;
+    do {
+      domains = locator_->getBlockDomains(block);
+    } while (!domains.empty());
   }
 
   tmb::client_id worker_client_id_;
@@ -146,6 +145,10 @@ TEST_F(BlockLocatorTest, BlockTest) {
       storage_manager_->createBlock(relation, relation.getDefaultStorageBlockLayout());
   checkLoaded(block);
 
+  const string peer_domain_network_address =
+      storage_manager_->getPeerDomainNetworkAddress(BlockIdUtil::Domain(block));
+  EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_address.data());
+
   ASSERT_TRUE(storage_manager_->saveBlockOrBlob(block));
   storage_manager_->evictBlockOrBlob(block);
   checkEvicted(block);
@@ -163,6 +166,10 @@ TEST_F(BlockLocatorTest, BlobTest) {
   const block_id blob = storage_manager_->createBlob(kDefaultBlockSizeInSlots);
   checkLoaded(blob);
 
+  const string peer_domain_network_address =
+      storage_manager_->getPeerDomainNetworkAddress(BlockIdUtil::Domain(blob));
+  EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_address.data());
+
   ASSERT_TRUE(storage_manager_->saveBlockOrBlob(blob));
   storage_manager_->evictBlockOrBlob(blob);
   checkEvicted(blob);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8c26c31f/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index c70eafa..b7d87e7 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -224,13 +224,14 @@ StorageManager::StorageManager(
   if (bus_) {
     storage_manager_client_id_ = bus_->Connect();
 
-    bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
-    bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
-
     bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainToShiftbossIndexMessage);
 
     bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
     bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
+
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kGetAllDomainNetworkAddressesMessage);
+    bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetAllDomainNetworkAddressesResponseMessage);
+
     bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
 
     LOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
@@ -548,10 +549,8 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
     return false;
   }
 
-  if (!response.is_valid()) {
-    LOG(INFO) << "The pulling block not found in all the peers";
-    return false;
-  }
+  CHECK(response.is_valid())
+      << "The pulling block not found in all the peers";
 
   const size_t num_slots = response.num_slots();
   DCHECK_NE(num_slots, 0u);
@@ -577,46 +576,54 @@ void* StorageManager::hdfs() {
   return nullptr;
 }
 
-vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
-  serialization::BlockMessage proto;
-  proto.set_block_id(block);
-
-  const int proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kGetPeerDomainNetworkAddressesMessage);
-  free(proto_bytes);
+string StorageManager::getPeerDomainNetworkAddress(const block_id_domain block_domain) {
+  {
+    SpinSharedMutexSharedLock<false> read_lock(block_domain_network_addresses_shared_mutex_);
+    const auto cit = block_domain_network_addresses_.find(block_domain);
+    if (cit != block_domain_network_addresses_.end()) {
+      return cit->second;
+    }
+  }
 
-  DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
-             << " sent GetPeerDomainNetworkAddressesMessage to BlockLocator";
+  {
+    SpinSharedMutexExclusiveLock<false> write_lock(block_domain_network_addresses_shared_mutex_);
 
-  DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
-  DCHECK(bus_ != nullptr);
-  CHECK(MessageBus::SendStatus::kOK ==
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         storage_manager_client_id_,
-                                         block_locator_client_id_,
-                                         move(message)));
+    // Check one more time if the block domain network info got set up by someone else.
+    auto cit = block_domain_network_addresses_.find(block_domain);
+    if (cit != block_domain_network_addresses_.end()) {
+      return cit->second;
+    }
 
-  const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
-  const TaggedMessage &tagged_message = annotated_message.tagged_message;
-  CHECK_EQ(block_locator_client_id_, annotated_message.sender);
-  CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
-  DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
-             << " received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
+    DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+               << " sent GetAllDomainNetworkAddressesMessage to BlockLocator";
 
-  serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
-  CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+    DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+    DCHECK(bus_ != nullptr);
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(bus_, storage_manager_client_id_, block_locator_client_id_,
+                                           TaggedMessage(kGetAllDomainNetworkAddressesMessage)));
+
+    const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    CHECK_EQ(block_locator_client_id_, annotated_message.sender);
+    CHECK_EQ(kGetAllDomainNetworkAddressesResponseMessage, tagged_message.message_type());
+    DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+               << " received GetAllDomainNetworkAddressesResponseMessage from BlockLocator";
+
+    serialization::GetAllDomainNetworkAddressesResponseMessage proto;
+    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+    for (const auto &domain_network_address_pair : proto.domain_network_addresses()) {
+      const block_id_domain block_domain = domain_network_address_pair.first;
+      if (block_domain_network_addresses_.find(block_domain) == block_domain_network_addresses_.end()) {
+        block_domain_network_addresses_.emplace(block_domain, domain_network_address_pair.second);
+      }
+    }
 
-  vector<string> domain_network_addresses;
-  for (int i = 0; i < response_proto.domain_network_addresses_size(); ++i) {
-    domain_network_addresses.push_back(response_proto.domain_network_addresses(i));
+    cit = block_domain_network_addresses_.find(block_domain);
+    DCHECK(cit != block_domain_network_addresses_.end());
+    return cit->second;
   }
-
-  return domain_network_addresses;
 }
 
 void StorageManager::sendBlockLocationMessage(const block_id block,
@@ -663,37 +670,34 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   // already loaded before this function gets called.
   BlockHandle loaded_handle;
 
-#ifdef QUICKSTEP_DISTRIBUTED
   // TODO(quickstep-team): Use a cost model to determine whether to load from
   // a remote peer or the disk.
-  if (BlockIdUtil::Domain(block) != block_domain_) {
-    DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
-    const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
-    for (const string &peer_domain_network_address : peer_domain_network_addresses) {
-      DataExchangerClientAsync client(
-          grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
-          this);
-
-      if (client.Pull(block, numa_node, &loaded_handle)) {
-        sendBlockLocationMessage(block, kAddBlockLocationMessage);
-        return loaded_handle;
-      }
-    }
+  const size_t num_slots = file_manager_->numSlots(block);
+  if (num_slots != 0) {
+    void *block_buffer = allocateSlots(num_slots, numa_node);
 
-    DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
-               << " from remote peers, so try to load from disk.";
-  }
-#endif
+    const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
+    CHECK(status) << "Failed to read block from persistent storage: " << block;
 
-  const size_t num_slots = file_manager_->numSlots(block);
-  DEBUG_ASSERT(num_slots != 0);
-  void *block_buffer = allocateSlots(num_slots, numa_node);
+    loaded_handle.block_memory = block_buffer;
+    loaded_handle.block_memory_size = num_slots;
+  } else {
+    bool pull_succeeded = false;
 
-  const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
-  CHECK(status) << "Failed to read block from persistent storage: " << block;
+#ifdef QUICKSTEP_DISTRIBUTED
+    const string domain_network_address = getPeerDomainNetworkAddress(BlockIdUtil::Domain(block));
+    DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from " << domain_network_address;
+    DataExchangerClientAsync client(
+        grpc::CreateChannel(domain_network_address, grpc::InsecureChannelCredentials()), this);
+    while (!client.Pull(block, numa_node, &loaded_handle)) {
+      LOG(INFO) << "Retry pulling Block " << BlockIdUtil::ToString(block) << " from " << domain_network_address;
+    }
 
-  loaded_handle.block_memory = block_buffer;
-  loaded_handle.block_memory_size = num_slots;
+    pull_succeeded = true;
+#endif
+
+    CHECK(pull_succeeded) << "Failed to pull Block " << BlockIdUtil::ToString(block) << " from remote peers.";
+  }
 
 #ifdef QUICKSTEP_DISTRIBUTED
   if (bus_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8c26c31f/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index dc4b7e8..eb40891 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -446,14 +446,13 @@ class StorageManager {
   };
 
   /**
-   * @brief Get the network info of all the remote StorageManagers which may
-   *        load the given block in the buffer pool.
+   * @brief Get the network info of the given block domain.
    *
-   * @param block The block or blob to pull.
+   * @param block_domain The domain of block or blob to pull.
    *
-   * @return The network info of all the possible peers to pull.
+   * @return The network info of the given block domain.
    **/
-  std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block);
+  std::string getPeerDomainNetworkAddress(const block_id_domain block_domain);
 
   /**
    * @brief Update the block location info in BlockLocator.
@@ -615,6 +614,10 @@ class StorageManager {
   std::unordered_map<block_id, BlockHandle> blocks_;
   alignas(kCacheLineBytes) mutable SpinSharedMutex<false> blocks_shared_mutex_;
 
+  // Used to pull a remote block.
+  std::unordered_map<block_id_domain, std::string> block_domain_network_addresses_;
+  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_domain_network_addresses_shared_mutex_;
+
   // This lock manager is used with the following contract:
   //   (1) A block cannot be evicted unless an exclusive lock is held on its
   //       lock shard.