You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/09 01:10:07 UTC

[38/48] incubator-quickstep git commit: Added BlockLocator.

Added BlockLocator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3789da72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3789da72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3789da72

Branch: refs/heads/query-manager-used-in-foreman
Commit: 3789da728e95b91bd97587f5e34da6ff1b55ea5f
Parents: 4503198
Author: Zuyu Zhang <zz...@pivotal.io>
Authored: Sat May 28 14:24:13 2016 -0700
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Wed Jun 8 11:57:46 2016 -0700

----------------------------------------------------------------------
 query_execution/BlockLocator.cpp                | 223 +++++++++++++++
 query_execution/BlockLocator.hpp                | 125 +++++++++
 query_execution/CMakeLists.txt                  |  48 ++++
 query_execution/QueryExecutionMessages.proto    |  34 +++
 query_execution/QueryExecutionTypedefs.hpp      |  16 ++
 query_execution/tests/BlockLocator_unittest.cpp | 270 +++++++++++++++++++
 storage/CMakeLists.txt                          |   9 +-
 storage/StorageManager.cpp                      | 190 ++++++++++++-
 storage/StorageManager.hpp                      |  76 +++++-
 9 files changed, 984 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
new file mode 100644
index 0000000..6cf5249
--- /dev/null
+++ b/query_execution/BlockLocator.cpp
@@ -0,0 +1,223 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/BlockLocator.hpp"
+
+#include <cstdlib>
+#include <string>
+#include <utility>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "threading/ThreadUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+void BlockLocator::run() {
+  if (cpu_id_ >= 0) {
+    ThreadUtil::BindToCPU(cpu_id_);
+  }
+
+  for (;;) {
+    // Receive() is a blocking call, causing this thread to sleep until next
+    // message is received.
+    const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    const client_id sender = annotated_message.sender;
+    LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
+              << "' message from TMB Client " << sender;
+    switch (tagged_message.message_type()) {
+      case kBlockDomainRegistrationMessage: {
+        serialization::BlockDomainRegistrationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
+        break;
+      }
+      case kAddBlockLocationMessage: {
+        serialization::BlockLocationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        const block_id block = proto.block_id();
+        const block_id_domain domain = proto.block_domain();
+
+        const auto result_block_locations = block_locations_[block].insert(domain);
+        const auto result_domain_blocks = domain_blocks_[domain].insert(block);
+        DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
+
+        if (result_domain_blocks.second) {
+          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
+        } else {
+          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
+        }
+        break;
+      }
+      case kDeleteBlockLocationMessage: {
+        serialization::BlockLocationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        const block_id block = proto.block_id();
+        const block_id_domain domain = proto.block_domain();
+
+        const auto cit = block_locations_[block].find(domain);
+        if (cit != block_locations_[block].end()) {
+          block_locations_[block].erase(domain);
+          domain_blocks_[domain].erase(block);
+
+          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
+        } else {
+          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
+        }
+        break;
+      }
+      case kLocateBlockMessage: {
+        serialization::BlockMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processLocateBlockMessage(sender, proto.block_id());
+        break;
+      }
+      case kGetPeerDomainNetworkAddressesMessage: {
+        serialization::BlockMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processGetPeerDomainNetworkAddressesMessage(sender, proto.block_id());
+        break;
+      }
+      case kBlockDomainUnregistrationMessage: {
+        serialization::BlockDomainMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        const block_id_domain domain = proto.block_domain();
+
+        domain_network_addresses_.erase(domain);
+
+        for (const block_id block : domain_blocks_[domain]) {
+          block_locations_[block].erase(domain);
+        }
+        domain_blocks_.erase(domain);
+
+        LOG(INFO) << "Unregistered Domain " << domain;
+        break;
+      }
+      case kPoisonMessage: {
+        return;
+      }
+    }
+  }
+}
+
+void BlockLocator::processBlockDomainRegistrationMessage(const client_id receiver,
+                                                         const std::string &network_address) {
+  DCHECK_LT(block_domain_, kMaxDomain);
+
+  domain_network_addresses_.emplace(++block_domain_, network_address);
+  domain_blocks_[block_domain_];
+
+  serialization::BlockDomainMessage proto;
+  proto.set_block_domain(block_domain_);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                 proto_length,
+                                 kBlockDomainRegistrationResponseMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+            << "') sent BlockDomainRegistrationResponseMessage (typed '"
+            << kBlockDomainRegistrationResponseMessage
+            << "') to Worker (id '" << receiver << "')";
+  CHECK(tmb::MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         locator_client_id_,
+                                         receiver,
+                                         move(message)));
+}
+
+void BlockLocator::processLocateBlockMessage(const client_id receiver,
+                                             const block_id block) {
+  serialization::LocateBlockResponseMessage proto;
+
+  for (const block_id_domain domain : block_locations_[block]) {
+    proto.add_block_domains(domain);
+  }
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                 proto_length,
+                                 kLocateBlockResponseMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+            << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
+            << "') to StorageManager (id '" << receiver << "')";
+  CHECK(tmb::MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         locator_client_id_,
+                                         receiver,
+                                         move(message)));
+}
+
+void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
+                                                               const block_id block) {
+  serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
+
+  for (const block_id_domain domain : block_locations_[block]) {
+    proto.add_domain_network_addresses(domain_network_addresses_[domain]);
+  }
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                 proto_length,
+                                 kGetPeerDomainNetworkAddressesResponseMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+            << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
+            << kGetPeerDomainNetworkAddressesResponseMessage
+            << "') to StorageManager (id '" << receiver << "')";
+  CHECK(tmb::MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         locator_client_id_,
+                                         receiver,
+                                         move(message)));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
new file mode 100644
index 0000000..bbd9b8f
--- /dev/null
+++ b/query_execution/BlockLocator.hpp
@@ -0,0 +1,125 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
+
+#include <atomic>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class for keeping trace of blocks loaded in a Worker's buffer pool
+ *        in the distributed version.
+ **/
+class BlockLocator : public Thread {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param bus A pointer to the TMB.
+   * @param cpu_id The ID of the CPU to which the BlockLocator thread can be pinned.
+   *
+   * @note If cpu_id is not specified, BlockLocator thread can be possibly moved
+   *       around on different CPUs by the OS.
+  **/
+  BlockLocator(tmb::MessageBus *bus,
+               const int cpu_id = -1)
+      : bus_(DCHECK_NOTNULL(bus)),
+        cpu_id_(cpu_id),
+        block_domain_(0) {
+    locator_client_id_ = bus_->Connect();
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
+    bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
+    bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage);
+    bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage);
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
+    bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage);
+    bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage);
+  }
+
+  ~BlockLocator() override {}
+
+  /**
+   * @brief Get the TMB client ID of BlockLocator thread.
+   *
+   * @return TMB client ID of BlockLocator thread.
+   **/
+  tmb::client_id getBusClientID() const {
+    return locator_client_id_;
+  }
+
+ protected:
+  void run() override;
+
+ private:
+  void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
+  void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
+  void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
+
+  tmb::MessageBus *bus_;
+
+  // The ID of the CPU that the BlockLocator thread can optionally be pinned to.
+  const int cpu_id_;
+
+  alignas(kCacheLineBytes) std::atomic<block_id_domain> block_domain_;
+
+  // From a block domain to its network info in the ip:port format, i.e.,
+  // "0.0.0.0:0".
+  std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
+
+  // From a block to its domains.
+  std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+
+  // From a block domain to all blocks loaded in its buffer pool.
+  std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;
+
+  tmb::client_id locator_client_id_;
+
+  DISALLOW_COPY_AND_ASSIGN(BlockLocator);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 04a0348..7d9d601 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -20,6 +20,9 @@ QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryExecutionMessages_proto_srcs
                          QueryExecutionMessages.proto)
 
 # Declare micro-libs:
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
+endif()
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -40,6 +43,19 @@ add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessag
 add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
 
 # Link dependencies:
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_BlockLocator
+                        glog
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageConstants
+                        quickstep_threading_Thread
+                        quickstep_threading_ThreadUtil
+                        quickstep_utility_Macros
+                        tmb)
+endif()
 target_link_libraries(quickstep_queryexecution_Foreman
                       glog
                       gtest
@@ -176,7 +192,37 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryexecution_WorkerSelectionPolicy)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution
+                        quickstep_queryexecution_BlockLocator)
+endif()
+
 # Tests:
+if (ENABLE_DISTRIBUTED)
+  add_executable(BlockLocator_unittest
+                 "${CMAKE_CURRENT_SOURCE_DIR}/tests/BlockLocator_unittest.cpp")
+  target_link_libraries(BlockLocator_unittest
+                        gflags_nothreads-static
+                        glog
+                        gtest
+                        quickstep_catalog_CatalogAttribute
+                        quickstep_catalog_CatalogRelation
+                        quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_storage_StorageBlob
+                        quickstep_storage_StorageBlock
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageConstants
+                        quickstep_storage_StorageManager
+                        quickstep_types_TypeFactory
+                        quickstep_types_TypeID
+                        tmb
+                        ${LIBS})
+  add_test(BlockLocator_unittest BlockLocator_unittest)
+endif()
+
 add_executable(Foreman_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp")
 target_link_libraries(Foreman_unittest
@@ -269,3 +315,5 @@ target_link_libraries(WorkerSelectionPolicy_unittest
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerSelectionPolicy)
 add_test(WorkerSelectionPolicy_unittest WorkerSelectionPolicy_unittest)
+
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/block_locator_test_data/)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 8d2efd0..15803cf 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,6 +16,10 @@ syntax = "proto2";
 
 package quickstep.serialization;
 
+// Used for any messages that do not carry payloads.
+message EmptyMessage {
+}
+
 // Used for both Normal WorkOrders and RebuildWorkOrders.
 // NOTE(zuyu): we might need to seperate the completion messages to contain
 // run-time information for Foreman to make better decisions on scheduling
@@ -42,3 +46,33 @@ message DataPipelineMessage {
 message WorkOrdersAvailableMessage {
   required uint64 operator_index = 1;
 }
+
+// BlockLocator related messages.
+message BlockDomainRegistrationMessage {
+  // Format IP:Port, i.e., "0.0.0.0:0".
+  required string domain_network_address = 1;
+}
+
+// Used for RegistrationResponse, Unregistration, and FailureReport.
+message BlockDomainMessage {
+  required uint32 block_domain = 1;
+}
+
+// Used when StorageManager loads or evicts a block or a blob from its buffer
+// pool.
+message BlockLocationMessage {
+  required fixed64 block_id = 1;
+  required uint32 block_domain = 2;
+}
+
+message BlockMessage {
+  required fixed64 block_id = 1;
+}
+
+message LocateBlockResponseMessage {
+  repeated uint32 block_domains = 1;
+}
+
+message GetPeerDomainNetworkAddressesResponseMessage {
+  repeated string domain_network_addresses = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 36cfd82..fc253bc 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -18,6 +18,7 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
 
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED
 #include "threading/ThreadIDBasedMap.hpp"
 
 #include "tmb/address.h"
@@ -55,6 +56,7 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
                                      'a',
                                      'p'>;
 
+// We sort the following message types in the order of a life cycle of a query.
 enum QueryExecutionMessageType : message_type_id {
   kWorkOrderMessage,  // From Foreman to Worker.
   kWorkOrderCompleteMessage,  // From Worker to Foreman.
@@ -66,6 +68,20 @@ enum QueryExecutionMessageType : message_type_id {
   kRebuildWorkOrderMessage,  // From Foreman to Worker.
   kRebuildWorkOrderCompleteMessage,  // From Worker to Foreman.
   kPoisonMessage,  // From the CLI shell to Foreman, then from Foreman to Workers.
+
+#ifdef QUICKSTEP_DISTRIBUTED
+  // BlockLocator related messages, sorted in a life cycle of StorageManager
+  // with a unique block domain.
+  kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.
+  kBlockDomainRegistrationResponseMessage,  // From BlockLocator to Worker.
+  kAddBlockLocationMessage,  // From StorageManager to BlockLocator.
+  kDeleteBlockLocationMessage,  // From StorageManager to BlockLocator.
+  kLocateBlockMessage,  // From StorageManager to BlockLocator.
+  kLocateBlockResponseMessage,  // From BlockLocator to StorageManager.
+  kGetPeerDomainNetworkAddressesMessage,  // From StorageManager to BlockLocator.
+  kGetPeerDomainNetworkAddressesResponseMessage,  // From BlockLocator to StorageManager.
+  kBlockDomainUnregistrationMessage,  // From StorageManager to BlockLocator.
+#endif
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
new file mode 100644
index 0000000..fe7b86b
--- /dev/null
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -0,0 +1,270 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class BlockLocatorTest : public ::testing::Test {
+ protected:
+  static const char kStoragePath[];
+  static const char kDomainNetworkAddress[];
+
+  ~BlockLocatorTest() {
+    locator_->join();
+  }
+
+  virtual void SetUp() {
+    bus_.Initialize();
+
+    locator_.reset(new BlockLocator(&bus_));
+    locator_client_id_ = locator_->getBusClientID();
+    locator_->start();
+
+    worker_client_id_ = bus_.Connect();
+
+    bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
+    bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
+
+    bus_.RegisterClientAsSender(worker_client_id_, kLocateBlockMessage);
+    bus_.RegisterClientAsReceiver(worker_client_id_, kLocateBlockResponseMessage);
+
+    bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
+
+    block_domain_ = getBlockDomain(kDomainNetworkAddress);
+
+    storage_manager_.reset(
+        new StorageManager(kStoragePath, block_domain_, locator_client_id_, &bus_));
+  }
+
+  virtual void TearDown() {
+    storage_manager_.reset();
+
+    serialization::EmptyMessage proto;
+
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kPoisonMessage);
+    free(proto_bytes);
+
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') sent PoisonMessage (typed '" << kPoisonMessage
+              << "') to BlockLocator (id '" << locator_client_id_ << "')";
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           worker_client_id_,
+                                           locator_client_id_,
+                                           move(message)));
+  }
+
+  vector<block_id_domain> getPeerDomains(const block_id block) {
+    serialization::BlockMessage proto;
+    proto.set_block_id(block);
+
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kLocateBlockMessage);
+    free(proto_bytes);
+
+  LOG(INFO) << "Worker (id '" << worker_client_id_
+            << "') sent LocateBlockMessage (typed '" << kLocateBlockMessage
+            << "') to BlockLocator";
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           worker_client_id_,
+                                           locator_client_id_,
+                                           move(message)));
+
+    const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    CHECK_EQ(kLocateBlockResponseMessage, tagged_message.message_type());
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') received LocateBlockResponseMessage from BlockLocator";
+
+    serialization::LocateBlockResponseMessage response_proto;
+    CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+    vector<block_id_domain> domains;
+    for (int i = 0; i < response_proto.block_domains_size(); ++i) {
+      domains.push_back(response_proto.block_domains(i));
+    }
+
+    return domains;
+  }
+
+  void checkLoaded(const block_id block) {
+    const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
+    EXPECT_EQ(1u, peer_domain_network_addresses.size());
+    EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data());
+
+    const vector<block_id_domain> domains = getPeerDomains(block);
+    EXPECT_EQ(1u, domains.size());
+    EXPECT_EQ(block_domain_, domains[0]);
+  }
+
+  void checkEvicted(const block_id block) {
+    const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
+    EXPECT_TRUE(peer_domain_network_addresses.empty());
+
+    const vector<block_id_domain> domains = getPeerDomains(block);
+    EXPECT_TRUE(domains.empty());
+  }
+
+  tmb::client_id worker_client_id_;
+
+  block_id_domain block_domain_;
+  unique_ptr<StorageManager> storage_manager_;
+
+ private:
+  block_id_domain getBlockDomain(const string &network_address) {
+    serialization::BlockDomainRegistrationMessage proto;
+    proto.set_domain_network_address(network_address);
+
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kBlockDomainRegistrationMessage);
+    free(proto_bytes);
+
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+              << "') to BlockLocator (id '" << locator_client_id_ << "')";
+
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           worker_client_id_,
+                                           locator_client_id_,
+                                           move(message)));
+
+    const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    EXPECT_EQ(locator_client_id_, annotated_message.sender);
+    EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') received BlockDomainRegistrationResponseMessage from BlockLocator";
+
+    serialization::BlockDomainMessage response_proto;
+    CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+    return static_cast<block_id_domain>(response_proto.block_domain());
+  }
+
+  MessageBusImpl bus_;
+
+  unique_ptr<BlockLocator> locator_;
+  tmb::client_id locator_client_id_;
+};
+
+const char BlockLocatorTest::kStoragePath[] = "./block_locator_test_data/";
+const char BlockLocatorTest::kDomainNetworkAddress[] = "ip:port";
+
+TEST_F(BlockLocatorTest, BlockTest) {
+  CatalogRelation relation(nullptr, "rel");
+  relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt)));
+
+  const block_id block =
+      storage_manager_->createBlock(relation, relation.getDefaultStorageBlockLayout());
+  checkLoaded(block);
+
+  ASSERT_TRUE(storage_manager_->saveBlockOrBlob(block));
+  storage_manager_->evictBlockOrBlob(block);
+  checkEvicted(block);
+
+  {
+    const BlockReference block_ref = storage_manager_->getBlock(block, relation);
+  }
+  checkLoaded(block);
+
+  storage_manager_->deleteBlockOrBlobFile(block);
+  checkEvicted(block);
+}
+
+TEST_F(BlockLocatorTest, BlobTest) {
+  const block_id blob = storage_manager_->createBlob(kDefaultBlockSizeInSlots);
+  checkLoaded(blob);
+
+  ASSERT_TRUE(storage_manager_->saveBlockOrBlob(blob));
+  storage_manager_->evictBlockOrBlob(blob);
+  checkEvicted(blob);
+
+  {
+    const BlobReference blob_ref = storage_manager_->getBlob(blob);
+  }
+  checkLoaded(blob);
+
+  storage_manager_->deleteBlockOrBlobFile(blob);
+  checkEvicted(blob);
+}
+
+}  // namespace quickstep
+
+int main(int argc, char **argv) {
+  google::InitGoogleLogging(argv[0]);
+  // Honor FLAGS_buffer_pool_slots in StorageManager.
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  ::testing::InitGoogleTest(&argc, argv);
+
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 87a5e54..4da16ea 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -941,7 +941,8 @@ target_link_libraries(quickstep_storage_StorageManager
                       quickstep_utility_Alignment
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Macros
-                      quickstep_utility_ShardedLockManager)
+                      quickstep_utility_ShardedLockManager
+                      tmb)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
 target_link_libraries(quickstep_storage_StorageManager
                       quickstep_storage_FileManagerHdfs)
@@ -950,6 +951,12 @@ if (QUICKSTEP_HAVE_LIBNUMA)
   target_link_libraries(quickstep_storage_StorageManager
                         ${LIBNUMA_LIBRARY})
 endif()
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_storage_StorageManager
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil)
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_storage_SubBlockTypeRegistry
                       glog
                       quickstep_storage_StorageBlockLayout_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 5d91052..15e2503 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -18,6 +18,7 @@
 // This is included before other files so that we can conditionally determine
 // what else to include.
 #include "catalog/CatalogConfig.h"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED
 #include "storage/StorageConfig.h"
 
 // Define feature test macros to enable large page support for mmap.
@@ -52,6 +53,12 @@
 #include <unordered_map>
 #include <vector>
 
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#endif
+
 #include "storage/CountedReference.hpp"
 #include "storage/EvictionPolicy.hpp"
 #include "storage/FileManagerLocal.hpp"
@@ -74,6 +81,13 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+#include "tmb/id_typedefs.h"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+#endif
+
 using std::free;
 using std::int32_t;
 using std::memset;
@@ -81,6 +95,15 @@ using std::size_t;
 using std::string;
 using std::vector;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+using std::malloc;
+using std::move;
+using std::unique_ptr;
+
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+#endif
+
 namespace quickstep {
 
 static bool ValidateBlockDomain(const char *flagname,
@@ -157,14 +180,21 @@ DEFINE_bool(use_hdfs, false, "Use HDFS as the persistent storage, instead of the
 #endif
 
 StorageManager::StorageManager(
-  const std::string &storage_path,
-  const block_id_domain block_domain,
-  const size_t max_memory_usage,
-  EvictionPolicy *eviction_policy)
+    const std::string &storage_path,
+    const block_id_domain block_domain,
+    const size_t max_memory_usage,
+    EvictionPolicy *eviction_policy,
+    const tmb::client_id block_locator_client_id,
+    tmb::MessageBus *bus)
     : storage_path_(storage_path),
       total_memory_usage_(0),
       max_memory_usage_(max_memory_usage),
-      eviction_policy_(eviction_policy) {
+      eviction_policy_(eviction_policy),
+#ifdef QUICKSTEP_DISTRIBUTED
+      block_domain_(block_domain),
+#endif
+      block_locator_client_id_(block_locator_client_id),
+      bus_(bus) {
 #ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
   if (FLAGS_use_hdfs) {
     file_manager_.reset(new FileManagerHdfs(storage_path));
@@ -175,10 +205,55 @@ StorageManager::StorageManager(
   file_manager_.reset(new FileManagerLocal(storage_path));
 #endif
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  // NOTE(zuyu): The following if-condition is a workaround to bypass code for
+  // the distributed version in some unittests that does not use TMB. The
+  // end-to-end functional tests for the distributed version, however, would not
+  // be affected.
+  if (bus_) {
+    storage_manager_client_id_ = bus_->Connect();
+
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
+    bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
+
+    LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+              << "') starts with Domain " << block_domain;
+  }
+#endif
+
   block_index_ = BlockIdUtil::GetBlockId(block_domain, file_manager_->getMaxUsedBlockCounter(block_domain));
 }
 
 StorageManager::~StorageManager() {
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    serialization::BlockDomainMessage proto;
+    proto.set_block_domain(block_domain_);
+
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kBlockDomainUnregistrationMessage);
+    free(proto_bytes);
+
+    LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+              << "') sent BlockDomainUnregistrationMessage (typed '" << kBlockDomainUnregistrationMessage
+              << "') to BlockLocator";
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           storage_manager_client_id_,
+                                           block_locator_client_id_,
+                                           move(message)));
+  }
+#endif
+
   for (std::unordered_map<block_id, BlockHandle>::iterator it = blocks_.begin();
        it != blocks_.end();
        ++it) {
@@ -222,6 +297,12 @@ block_id StorageManager::createBlock(const CatalogRelationSchema &relation,
   // Make '*eviction_policy_' aware of the new block's existence.
   eviction_policy_->blockCreated(new_block_id);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage);
+  }
+#endif
+
   return new_block_id;
 }
 
@@ -249,6 +330,12 @@ block_id StorageManager::createBlob(const std::size_t num_slots,
   // Make '*eviction_policy_' aware of the new blob's existence.
   eviction_policy_->blockCreated(new_block_id);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage);
+  }
+#endif
+
   return new_block_id;
 }
 
@@ -315,6 +402,12 @@ bool StorageManager::saveBlockOrBlob(const block_id block, const bool force) {
 }
 
 void StorageManager::evictBlockOrBlob(const block_id block) {
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    sendBlockLocationMessage(block, kDeleteBlockLocationMessage);
+  }
+#endif
+
   BlockHandle handle;
   {
     SpinSharedMutexExclusiveLock<false> write_lock(blocks_shared_mutex_);
@@ -362,6 +455,87 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
   return ++block_index_;
 }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
+  serialization::BlockMessage proto;
+  proto.set_block_id(block);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kGetPeerDomainNetworkAddressesMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+            << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
+            << "') to BlockLocator";
+
+  DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+  DCHECK(bus_ != nullptr);
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         storage_manager_client_id_,
+                                         block_locator_client_id_,
+                                         move(message)));
+
+  const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
+  const TaggedMessage &tagged_message = annotated_message.tagged_message;
+  CHECK_EQ(block_locator_client_id_, annotated_message.sender);
+  CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
+  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+            << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
+
+  serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
+  CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+  vector<string> domain_network_addresses;
+  for (int i = 0; i < response_proto.domain_network_addresses_size(); ++i) {
+    domain_network_addresses.push_back(response_proto.domain_network_addresses(i));
+  }
+
+  return domain_network_addresses;
+}
+
+void StorageManager::sendBlockLocationMessage(const block_id block,
+                                              const tmb::message_type_id message_type) {
+  switch (message_type) {
+    case kAddBlockLocationMessage:
+      LOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+      break;
+    case kDeleteBlockLocationMessage:
+      LOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+      break;
+    default:
+      LOG(FATAL) << "Unknown message type " << message_type;
+  }
+
+  serialization::BlockLocationMessage proto;
+  proto.set_block_id(block);
+  proto.set_block_domain(block_domain_);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        message_type);
+  free(proto_bytes);
+
+  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+            << "') sent BlockLocationMessage (typed '" << message_type
+            << "') to BlockLocator";
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         storage_manager_client_id_,
+                                         block_locator_client_id_,
+                                         move(message)));
+}
+#endif
+
 StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
     const block_id block, const int numa_node) {
   // The caller of this function holds an exclusive lock on this block/blob's
@@ -378,6 +552,12 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   loaded_handle.block_memory = block_buffer;
   loaded_handle.block_memory_size = num_slots;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    sendBlockLocationMessage(block, kAddBlockLocationMessage);
+  }
+#endif
+
   return loaded_handle;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 52326c2..55a011e 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -20,11 +20,14 @@
 
 #include <atomic>
 #include <chrono>
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED
+
 #include "storage/CountedReference.hpp"
 #include "storage/EvictionPolicy.hpp"
 #include "storage/FileManager.hpp"
@@ -40,6 +43,10 @@
 #include "gflags/gflags.h"
 #include "gtest/gtest_prod.h"
 
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
 namespace quickstep {
 
 DECLARE_int32(block_domain);
@@ -50,6 +57,7 @@ DECLARE_bool(use_hdfs);
 #endif
 
 class CatalogRelationSchema;
+
 class StorageBlockLayout;
 
 /** \addtogroup Storage
@@ -104,6 +112,33 @@ class StorageManager {
                            std::chrono::milliseconds(200))) {
   }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Constructor.
+   * @param storage_path The filesystem directory where blocks have persistent
+   *        storage.
+   * @param block_domain The unique block domain.
+   * @param block_locator_client_id The TMB client ID of the block locator.
+   * @param bus A pointer to the TMB.
+   *
+   * @exception CorruptPersistentStorage The storage directory layout is not
+   *            in the expected format.
+   **/
+  StorageManager(const std::string &storage_path,
+                 const block_id_domain block_domain,
+                 const tmb::client_id block_locator_client_id,
+                 tmb::MessageBus *bus)
+      : StorageManager(storage_path,
+                       block_domain,
+                       FLAGS_buffer_pool_slots,
+                       LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
+                           2,
+                           std::chrono::milliseconds(200)),
+                       block_locator_client_id,
+                       bus) {
+  }
+#endif
+
   /**
    * @brief Constructor.
    * @param storage_path The filesystem directory where blocks have persistent
@@ -121,13 +156,18 @@ class StorageManager {
    * @param eviction_policy The eviction policy that the storage manager should
    *                        use to manage the cache. The storage manager takes
    *                        ownership of *eviction_policy.
+   * @param block_locator_client_id The TMB client ID of the block locator.
+   * @param bus A pointer to the TMB.
+   *
    * @exception CorruptPersistentStorage The storage directory layout is not
    *            in the expected format.
    **/
   StorageManager(const std::string &storage_path,
                  const block_id_domain block_domain,
                  const size_t max_memory_usage,
-                 EvictionPolicy *eviction_policy);
+                 EvictionPolicy *eviction_policy,
+                 const tmb::client_id block_locator_client_id = tmb::kClientIdNone,
+                 tmb::MessageBus *bus = nullptr);
 
   /**
    * @brief Destructor which also destroys all managed blocks.
@@ -332,6 +372,27 @@ class StorageManager {
     StorageBlockBase *block;
   };
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Get the network info of all the remote StorageManagers which may
+   *        load the given block in the buffer pool.
+   *
+   * @param block The block or blob to pull.
+   *
+   * @return The network info of all the possible peers to pull.
+   **/
+  std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block);
+
+  /**
+   * @brief Update the block location info in BlockLocator.
+   *
+   * @param block The given block or blob.
+   * @param message_type Indicate whether to add or delete a block location.
+   **/
+  void sendBlockLocationMessage(const block_id block,
+                                const tmb::message_type_id message_type);
+#endif
+
   // Helper for createBlock() and createBlob(). Allocates a block ID and memory
   // slots for a new StorageBlock or StorageBlob. Returns the allocated ID and
   // writes the allocated slot range into 'handle->slot_index_low' and
@@ -459,6 +520,15 @@ class StorageManager {
 
   std::unique_ptr<EvictionPolicy> eviction_policy_;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  const block_id_domain block_domain_;
+
+  tmb::client_id storage_manager_client_id_;
+#endif
+
+  const tmb::client_id block_locator_client_id_;
+  tmb::MessageBus *bus_;
+
   std::unique_ptr<FileManager> file_manager_;
 
   // Used to generate unique IDs in allocateNewBlockOrBlob().
@@ -486,6 +556,10 @@ class StorageManager {
   static constexpr std::size_t kLockManagerNumShards = 0x2000-1;
   ShardedLockManager<block_id, kLockManagerNumShards, SpinSharedMutex<false>> lock_manager_;
 
+  friend class BlockLocatorTest;
+  FRIEND_TEST(BlockLocatorTest, BlockTest);
+  FRIEND_TEST(BlockLocatorTest, BlobTest);
+
   FRIEND_TEST(StorageManagerTest, DifferentNUMANodeBlobTestWithEviction);
   FRIEND_TEST(StorageManagerTest, EvictFromSameShardTest);