You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/28 08:54:51 UTC

incubator-quickstep git commit: Added Pull-based DataExchange service. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/data-exchange 659adfed5 -> 3a46073dd (forced update)


Added Pull-based DataExchange service.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3a46073d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3a46073d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3a46073d

Branch: refs/heads/data-exchange
Commit: 3a46073dd20cfaae8a9bd84e18cc772c61175834
Parents: 2bc4a0a
Author: Zuyu Zhang <zz...@pivotal.io>
Authored: Fri May 27 17:41:40 2016 -0700
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Sat May 28 01:52:59 2016 -0700

----------------------------------------------------------------------
 CMakeLists.txt                               |  11 +-
 query_execution/BlockLocator.cpp             | 223 ++++++++++++++++
 query_execution/BlockLocator.hpp             | 124 +++++++++
 query_execution/CMakeLists.txt               |  20 ++
 query_execution/QueryExecutionMessages.proto |  41 +++
 query_execution/QueryExecutionTypedefs.hpp   |  14 +
 storage/CMakeLists.txt                       |  72 +++++-
 storage/DataExchange.proto                   |  31 +++
 storage/DataExchangerAsync.cpp               | 165 ++++++++++++
 storage/DataExchangerAsync.hpp               |  97 +++++++
 storage/StorageManager.cpp                   | 298 +++++++++++++++++++++-
 storage/StorageManager.hpp                   | 131 +++++++++-
 storage/tests/DataExchange_unittest.cpp      | 239 +++++++++++++++++
 third_party/iwyu/iwyu_helper.py              |   1 +
 validate_cmakelists.py                       |   5 +-
 15 files changed, 1461 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 87a8f7c..3769eab 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -659,7 +659,12 @@ set(ENABLE_PUREMEMORY ON CACHE BOOL "Enable PureMemory TMB")
 set(ENABLE_LEVELDB OFF CACHE BOOL "Enable LevelDB TMB")
 set(ENABLE_MEMORYMIRROR OFF CACHE BOOL "Enable MemoryMirror TMB")
 set(ENABLE_NATIVELOG OFF CACHE BOOL "Enable NativeLog TMB")
-set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
+
+# The distributed version requires to use the NativeNet implementation.
+if (NOT ENABLE_DISTRIBUTED)
+  set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
+endif()
+
 set(ENABLE ENABLE_SQLITE OFF CACHE BOOL "Enable SQLite TMB")
 set(ENABLE_VOLTDB OFF CACHE BOOL "Enable VoltDB TMB")
 set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
@@ -667,6 +672,10 @@ set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
 add_subdirectory("${THIRD_PARTY_SOURCE_DIR}/tmb" "${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb")
 include_directories(${TMB_INCLUDE_DIRS})
 
+if (ENABLE_NATIVENET)
+  include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb/include)
+endif()
+
 # Add all of the module subdirectories. CMakeLists.txt in each of the subdirectories
 # defines how to build that module's libraries.
 add_subdirectory(catalog)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
new file mode 100644
index 0000000..16a7216
--- /dev/null
+++ b/query_execution/BlockLocator.cpp
@@ -0,0 +1,223 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/BlockLocator.hpp"
+
+#include <cstdlib>
+#include <string>
+#include <utility>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "threading/ThreadUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+void BlockLocator::run() {
+  if (cpu_id_ >= 0) {
+    ThreadUtil::BindToCPU(cpu_id_);
+  }
+
+  for (;;) {
+    // Receive() is a blocking call, causing this thread to sleep until next
+    // message is received.
+    const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    const client_id sender = annotated_message.sender;
+    LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
+              << "' message from TMB Client " << sender;
+    switch (tagged_message.message_type()) {
+      case kBlockDomainRegistrationMessage: {
+        serialization::BlockDomainRegistrationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
+        break;
+      }
+      case kAddBlockLocationMessage: {
+        serialization::BlockLocationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        const block_id block = proto.block_id();
+        const block_id_domain domain = proto.block_domain();
+
+        const auto result_block_locations = block_locations_[block].insert(domain);
+        const auto result_domain_blocks = domain_blocks_[domain].insert(block);
+        DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
+
+        if (result_domain_blocks.second) {
+          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
+        } else {
+          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
+        }
+        break;
+      }
+      case kDeleteBlockLocationMessage: {
+        serialization::BlockLocationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        const block_id block = proto.block_id();
+        const block_id_domain domain = proto.block_domain();
+
+        const auto cit = block_locations_[block].find(domain);
+        if (cit != block_locations_[block].end()) {
+          block_locations_[block].erase(domain);
+          domain_blocks_[domain].erase(block);
+
+          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
+        } else {
+          LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
+        }
+        break;
+      }
+      case kLocateBlockMessage: {
+        serialization::LocateBlockMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processLocateBlockMessage(sender, proto.block_id());
+        break;
+      }
+      case kGetPeerDomainNetworkAddressesMessage: {
+        serialization::GetPeerDomainNetworkAddressesMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processGetPeerDomainNetworkAddressesMessage(sender, proto.block_id());
+        break;
+      }
+      case kBlockDomainUnregistrationMessage: {
+        serialization::BlockDomainUnregistrationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        const block_id_domain domain = proto.block_domain();
+
+        domain_network_addresses_.erase(domain);
+
+        for (const block_id block : domain_blocks_[domain]) {
+          block_locations_[block].erase(domain);
+        }
+        domain_blocks_.erase(domain);
+
+        LOG(INFO) << "Unregistered Domain " << domain;
+        break;
+      }
+      case kPoisonMessage: {
+        return;
+      }
+    }
+  }
+}
+
+void BlockLocator::processBlockDomainRegistrationMessage(const client_id receiver,
+                                                         const std::string &network_address) {
+  DCHECK_LT(block_domain_, kMaxDomain);
+
+  domain_network_addresses_.emplace(++block_domain_, network_address);
+  domain_blocks_[block_domain_];
+
+  serialization::BlockDomainRegistrationResponseMessage proto;
+  proto.set_block_domain(block_domain_);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                 proto_length,
+                                 kBlockDomainRegistrationResponseMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+            << "') sent BlockDomainRegistrationResponseMessage (typed '"
+            << kBlockDomainRegistrationResponseMessage
+            << "') to Worker (id '" << receiver << "')";
+  CHECK(tmb::MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         locator_client_id_,
+                                         receiver,
+                                         move(message)));
+}
+
+void BlockLocator::processLocateBlockMessage(const client_id receiver,
+                                             const block_id block) {
+  serialization::LocateBlockResponseMessage proto;
+
+  for (const block_id_domain domain : block_locations_[block]) {
+    proto.add_block_domains(domain);
+  }
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                 proto_length,
+                                 kLocateBlockResponseMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+            << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
+            << "') to StorageManager (id '" << receiver << "')";
+  CHECK(tmb::MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         locator_client_id_,
+                                         receiver,
+                                         move(message)));
+}
+
+void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
+                                                               const block_id block) {
+  serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
+
+  for (const block_id_domain domain : block_locations_[block]) {
+    proto.add_domain_network_addresses(domain_network_addresses_[domain]);
+  }
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                 proto_length,
+                                 kGetPeerDomainNetworkAddressesResponseMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+            << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
+            << kGetPeerDomainNetworkAddressesResponseMessage
+            << "') to StorageManager (id '" << receiver << "')";
+  CHECK(tmb::MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         locator_client_id_,
+                                         receiver,
+                                         move(message)));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
new file mode 100644
index 0000000..a158294
--- /dev/null
+++ b/query_execution/BlockLocator.hpp
@@ -0,0 +1,124 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
+
+#include <atomic>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class for keeping trace of blocks loaded in a Worker's buffer pool
+ *        in the distributed version.
+ **/
+class BlockLocator : public Thread {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param bus A pointer to the TMB.
+   * @param cpu_id The ID of the CPU to which the BlockLocator thread can be pinned.
+   *
+   * @note If cpu_id is not specified, BlockLocator thread can be possibly moved
+   *       around on different CPUs by the OS.
+  **/
+  BlockLocator(tmb::MessageBus *bus,
+               const int cpu_id = -1)
+      : bus_(DCHECK_NOTNULL(bus)),
+        cpu_id_(cpu_id),
+        block_domain_(0) {
+    locator_client_id_ = bus_->Connect();
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
+    bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
+    bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage);
+    bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage);
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
+    bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+
+    bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage);
+    bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage);
+  }
+
+  ~BlockLocator() override {}
+
+  /**
+   * @brief Get the TMB client ID of BlockLocator thread.
+   *
+   * @return TMB client ID of BlockLocator thread.
+   **/
+  tmb::client_id getBusClientID() const {
+    return locator_client_id_;
+  }
+
+ protected:
+  void run() override;
+
+ private:
+  void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
+  void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
+  void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
+
+  tmb::MessageBus *bus_;
+
+  // The ID of the CPU that the BlockLocator thread can optionally be pinned to.
+  const int cpu_id_;
+
+  std::atomic<block_id_domain> block_domain_;
+
+  // From a block domain to its network info in the ip:port format, i.e.,
+  // "0.0.0.0:0".
+  std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
+
+  // From a block to its domains.
+  std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+
+  // From a block domain to all blocks loaded in its buffer pool.
+  std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;
+
+  tmb::client_id locator_client_id_;
+
+  DISALLOW_COPY_AND_ASSIGN(BlockLocator);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 04a0348..d5152f3 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -20,6 +20,9 @@ QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryExecutionMessages_proto_srcs
                          QueryExecutionMessages.proto)
 
 # Declare micro-libs:
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
+endif()
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -40,6 +43,18 @@ add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessag
 add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
 
 # Link dependencies:
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_BlockLocator
+                        glog
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_threading_Thread
+                        quickstep_threading_ThreadUtil
+                        quickstep_utility_Macros
+                        tmb)
+endif()
 target_link_libraries(quickstep_queryexecution_Foreman
                       glog
                       gtest
@@ -176,6 +191,11 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryexecution_WorkerSelectionPolicy)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution
+                        quickstep_queryexecution_BlockLocator)
+endif()
+
 # Tests:
 add_executable(Foreman_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp")

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 8d2efd0..06972d1 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,6 +16,10 @@ syntax = "proto2";
 
 package quickstep.serialization;
 
+// Used for any messages that do not carry payloads.
+message EmptyMessage {
+}
+
 // Used for both Normal WorkOrders and RebuildWorkOrders.
 // NOTE(zuyu): we might need to seperate the completion messages to contain
 // run-time information for Foreman to make better decisions on scheduling
@@ -42,3 +46,40 @@ message DataPipelineMessage {
 message WorkOrdersAvailableMessage {
   required uint64 operator_index = 1;
 }
+
+// BlockLocator related messages.
+message BlockDomainRegistrationMessage {
+  // Format IP:Port, i.e., "0.0.0.0:0".
+  required string domain_network_address = 1;
+}
+
+message BlockDomainRegistrationResponseMessage {
+  required uint32 block_domain = 1;
+}
+
+// Used when StorageManager loads or evicts a block or a blob from its buffer
+// pool.
+message BlockLocationMessage {
+  required fixed64 block_id = 1;
+  required uint32 block_domain = 2;
+}
+
+message LocateBlockMessage {
+  required fixed64 block_id = 1;
+}
+
+message LocateBlockResponseMessage {
+  repeated uint32 block_domains = 1;
+}
+
+message GetPeerDomainNetworkAddressesMessage {
+  required fixed64 block_id = 1;
+}
+
+message GetPeerDomainNetworkAddressesResponseMessage {
+  repeated string domain_network_addresses = 1;
+}
+
+message BlockDomainUnregistrationMessage {
+  required uint32 block_domain = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 36cfd82..6f9a88f 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -18,6 +18,7 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
 
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED
 #include "threading/ThreadIDBasedMap.hpp"
 
 #include "tmb/address.h"
@@ -66,6 +67,19 @@ enum QueryExecutionMessageType : message_type_id {
   kRebuildWorkOrderMessage,  // From Foreman to Worker.
   kRebuildWorkOrderCompleteMessage,  // From Worker to Foreman.
   kPoisonMessage,  // From the CLI shell to Foreman, then from Foreman to Workers.
+
+#ifdef QUICKSTEP_DISTRIBUTED
+  // BlockLocator related messages.
+  kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.
+  kBlockDomainRegistrationResponseMessage,  // From BlockLocator to Worker.
+  kAddBlockLocationMessage,  // From StorageManager to BlockLocator.
+  kDeleteBlockLocationMessage,  // From StorageManager to BlockLocator.
+  kLocateBlockMessage,  // From StorageManager to BlockLocator.
+  kLocateBlockResponseMessage,  // From BlockLocator to StorageManager.
+  kGetPeerDomainNetworkAddressesMessage,  // From StorageManager to BlockLocator.
+  kGetPeerDomainNetworkAddressesResponseMessage,  // From BlockLocator to StorageManager.
+  kBlockDomainUnregistrationMessage,  // From StorageManager to BlockLocator.
+#endif
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 115248c..233b9bb 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -126,6 +126,13 @@ QS_PROTOBUF_GENERATE_CPP(storage_StorageBlockLayout_proto_srcs
                          storage_StorageBlockLayout_proto_hdrs
                          StorageBlockLayout.proto)
 
+if (ENABLE_DISTRIBUTED)
+  GRPC_GENERATE_CPP(storage_DataExchange_proto_srcs
+                    storage_DataExchange_proto_hdrs
+                    .
+                    DataExchange.proto)
+endif()
+
 # Declare micro-libs:
 add_library(quickstep_storage_AggregationOperationState
             AggregationOperationState.cpp
@@ -171,6 +178,12 @@ add_library(quickstep_storage_CompressedTupleStorageSubBlock
             CompressedTupleStorageSubBlock.hpp)
 add_library(quickstep_storage_CountedReference ../empty_src.cpp CountedReference.hpp)
 add_library(quickstep_storage_CSBTreeIndexSubBlock CSBTreeIndexSubBlock.cpp CSBTreeIndexSubBlock.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_storage_DataExchange_proto
+              ${storage_DataExchange_proto_srcs}
+              ${storage_DataExchange_proto_hdrs})
+  add_library(quickstep_storage_DataExchangerAsync DataExchangerAsync.cpp DataExchangerAsync.hpp)
+endif()
 add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
 add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
@@ -573,6 +586,17 @@ target_link_libraries(quickstep_storage_CSBTreeIndexSubBlock
                       quickstep_utility_Macros
                       quickstep_utility_PtrVector
                       quickstep_utility_ScopedBuffer)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_storage_DataExchange_proto
+                        ${PROTOBUF3_LIBRARY})
+  target_link_libraries(quickstep_storage_DataExchangerAsync
+                        glog
+                        quickstep_storage_DataExchange_proto
+                        quickstep_storage_StorageManager
+                        quickstep_threading_Thread
+                        quickstep_utility_Macros
+                        ${GRPCPLUSPLUS_LIBRARIES})
+endif()
 target_link_libraries(quickstep_storage_EvictionPolicy
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageConstants
@@ -913,6 +937,7 @@ target_link_libraries(quickstep_storage_StorageManager
                       gflags_nothreads-static
                       glog
                       gtest
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_storage_CountedReference
                       quickstep_storage_EvictionPolicy
                       quickstep_storage_FileManager
@@ -929,7 +954,8 @@ target_link_libraries(quickstep_storage_StorageManager
                       quickstep_utility_Alignment
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Macros
-                      quickstep_utility_ShardedLockManager)
+                      quickstep_utility_ShardedLockManager
+                      tmb)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
 target_link_libraries(quickstep_storage_StorageManager
                       quickstep_storage_FileManagerHdfs)
@@ -938,6 +964,14 @@ if (QUICKSTEP_HAVE_LIBNUMA)
   target_link_libraries(quickstep_storage_StorageManager
                         ${LIBNUMA_LIBRARY})
 endif()
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_storage_StorageManager
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_storage_DataExchange_proto
+                        ${GRPCPLUSPLUS_LIBRARIES})
+endif()
 target_link_libraries(quickstep_storage_SubBlockTypeRegistry
                       glog
                       quickstep_storage_StorageBlockLayout_proto
@@ -1051,6 +1085,11 @@ elseif (QUICKSTEP_HAVE_FILE_MANAGER_WINDOWS)
   target_link_libraries(quickstep_storage
                         quickstep_storage_FileManagerWindows)
 endif()
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_storage
+                        quickstep_storage_DataExchange_proto
+                        quickstep_storage_DataExchangerAsync)
+endif()
 # CMAKE_VALIDATE_IGNORE_BEGIN
 if(QUICKSTEP_HAVE_BITWEAVING)
   target_link_libraries(quickstep_storage
@@ -1356,6 +1395,37 @@ target_link_libraries(CSBTreeIndexSubBlock_unittest
                       ${LIBS})
 add_test(CSBTreeIndexSubBlock_unittest CSBTreeIndexSubBlock_unittest)
 
+if (ENABLE_DISTRIBUTED)
+  add_executable(DataExchange_unittest
+                 "${CMAKE_CURRENT_SOURCE_DIR}/tests/DataExchange_unittest.cpp")
+  target_link_libraries(DataExchange_unittest
+                        gflags_nothreads-static
+                        glog
+                        gtest
+                        quickstep_catalog_CatalogAttribute
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_storage_CountedReference
+                        quickstep_storage_DataExchangerAsync
+                        quickstep_storage_StorageBlob
+                        quickstep_storage_StorageBlock
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageConstants
+                        quickstep_storage_StorageManager
+                        quickstep_storage_TupleStorageSubBlock
+                        quickstep_types_TypeFactory
+                        quickstep_types_TypeID
+                        quickstep_types_TypedValue
+                        quickstep_types_containers_Tuple
+                        tmb
+                        ${LIBS})
+  add_test(DataExchange_unittest DataExchange_unittest)
+endif()
+
 add_executable(EvictionPolicy_unittest
                "${CMAKE_CURRENT_SOURCE_DIR}/tests/EvictionPolicy_unittest.cpp")
 target_link_libraries(EvictionPolicy_unittest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/DataExchange.proto
----------------------------------------------------------------------
diff --git a/storage/DataExchange.proto b/storage/DataExchange.proto
new file mode 100644
index 0000000..a2636e5
--- /dev/null
+++ b/storage/DataExchange.proto
@@ -0,0 +1,31 @@
+//   Copyright 2016 Pivotal Software, Inc.
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+
+syntax = "proto3";
+
+package quickstep;
+
+service DataExchange {
+  rpc Pull (PullRequest) returns (PullResponse) {}
+}
+
+message PullRequest {
+  fixed64 block_id = 1;
+}
+
+message PullResponse {
+  bool is_valid = 1;
+  uint64 num_slots = 2;
+  bytes block = 3;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/DataExchangerAsync.cpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.cpp b/storage/DataExchangerAsync.cpp
new file mode 100644
index 0000000..68230a7
--- /dev/null
+++ b/storage/DataExchangerAsync.cpp
@@ -0,0 +1,165 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "storage/DataExchangerAsync.hpp"
+
+#include <grpc++/grpc++.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include "storage/DataExchange.grpc.pb.h"
+#include "storage/DataExchange.pb.h"
+#include "storage/StorageManager.hpp"
+
+#include "glog/logging.h"
+
+using grpc::ServerCompletionQueue;
+
+namespace quickstep {
+namespace {
+
+/**
+ * @brief RPC Request Context Instance.
+ **/
+class CallContext {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param service The async service.
+   * @param queue The RPC request queue.
+   * @param storage_manager The StorageManager to use.
+   **/
+  CallContext(DataExchange::AsyncService *service,
+           ServerCompletionQueue *queue,
+           StorageManager *storage_manager)
+      : service_(service),
+        queue_(queue),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        responder_(&context_),
+        status_(CallStatus::CREATE) {
+    Proceed();
+  }
+
+  /**
+   * @brief Process the RPC request.
+   **/
+  void Proceed();
+
+ private:
+  DataExchange::AsyncService *service_;
+  ServerCompletionQueue *queue_;
+
+  StorageManager *storage_manager_;
+
+  grpc::ServerContext context_;
+
+  PullRequest request_;
+  PullResponse response_;
+
+  grpc::ServerAsyncResponseWriter<PullResponse> responder_;
+
+  enum class CallStatus {
+    CREATE = 0,
+    PROCESS,
+    FINISH,
+  };
+  CallStatus status_;
+};
+
+void CallContext::Proceed() {
+  switch (status_) {
+    case CallStatus::CREATE: {
+      // Change this instance progress to the PROCESS state.
+      status_ = CallStatus::PROCESS;
+
+      // As part of the initial CREATE state, we *request* that the system
+      // start processing Pull requests. In this request, "this" acts are
+      // the tag uniquely identifying the request (so that different CallContext
+      // instances can serve different requests concurrently), in this case
+      // the memory address of this CallContext instance.
+      service_->RequestPull(&context_, &request_, &responder_, queue_, queue_, this);
+      break;
+    }
+    case CallStatus::PROCESS: {
+      // Spawn a new CallContext instance to serve new clients while we process
+      // the one for this CallContext. The instance will deallocate itself as
+      // part of its FINISH state.
+      new CallContext(service_, queue_, storage_manager_);
+
+      // The actual processing.
+      storage_manager_->pullBlockOrBlob(request_.block_id(), &response_);
+
+      // And we are done! Let the gRPC runtime know we've finished, using the
+      // memory address of this instance as the uniquely identifying tag for
+      // the event.
+      status_ = CallStatus::FINISH;
+      responder_.Finish(response_, grpc::Status::OK, this);
+      break;
+    }
+    case CallStatus::FINISH: {
+      // Once in the FINISH state, deallocate ourselves (CallContext).
+      delete this;
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown call status.";
+  }
+}
+
+}  // namespace
+
+const char *DataExchangerAsync::kLocalNetworkAddress = "0.0.0.0:";
+
+DataExchangerAsync::DataExchangerAsync() {
+  grpc::ServerBuilder builder;
+  builder.AddListeningPort(kLocalNetworkAddress, grpc::InsecureServerCredentials(), &port_);
+  builder.RegisterService(&service_);
+
+  queue_ = builder.AddCompletionQueue();
+  server_ = builder.BuildAndStart();
+
+  DCHECK_GT(port_, 0);
+  server_address_ = kLocalNetworkAddress + std::to_string(port_);
+  LOG(INFO) << "DataExchangerAsync Service listening on " << server_address_;
+}
+
+void DataExchangerAsync::run() {
+  // Self-destruct upon success.
+  new CallContext(&service_, queue_.get(), storage_manager_);
+
+  void *tag = nullptr;  // Uniquely identify a request.
+  bool ok = false;
+
+  while (true) {
+    if (queue_->Next(&tag, &ok)) {
+      CallContext *call_context = static_cast<CallContext*>(tag);
+      if (ok) {
+        call_context->Proceed();
+      } else {
+        LOG(WARNING) << "Not ok\n";
+        delete call_context;
+      }
+    } else {
+      LOG(INFO) << "Shutdown\n";
+      return;
+    }
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/DataExchangerAsync.hpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.hpp b/storage/DataExchangerAsync.hpp
new file mode 100644
index 0000000..75a4e4d
--- /dev/null
+++ b/storage/DataExchangerAsync.hpp
@@ -0,0 +1,97 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
+#define QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
+
+#include <grpc++/grpc++.h>
+
+#include <memory>
+#include <string>
+
+#include "storage/DataExchange.grpc.pb.h"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+
+/**
+ * @brief A class which exchanges data from a StorageManager to its peer.
+ **/
+class DataExchangerAsync final : public Thread {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  DataExchangerAsync();
+
+  ~DataExchangerAsync() override {}
+
+  /**
+   * @brief Set the local StorageManager.
+   *
+   * @param storage_manager The StorageManager to use.
+   **/
+  void set_storage_manager(StorageManager *storage_manager) {
+    storage_manager_ = storage_manager;
+  }
+
+  /**
+   * @brief Return its network address for peers to connect.
+   *
+   * @return Its network address.
+   **/
+  std::string network_address() const {
+    DCHECK(!server_address_.empty());
+    return server_address_;
+  }
+
+  /**
+   * @brief Shutdown itself.
+   **/
+  void shutdown() {
+    server_->Shutdown();
+    // Always shutdown the completion queue after the server.
+    queue_->Shutdown();
+  }
+
+ protected:
+  void run() override;
+
+ private:
+  static const char *kLocalNetworkAddress;
+
+  DataExchange::AsyncService service_;
+
+  int port_ = -1;
+  // Format IP:port, i.e., "0.0.0.0:0".
+  std::string server_address_;
+
+  std::unique_ptr<grpc::ServerCompletionQueue> queue_;
+  std::unique_ptr<grpc::Server> server_;
+
+  StorageManager *storage_manager_ = nullptr;
+
+  DISALLOW_COPY_AND_ASSIGN(DataExchangerAsync);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index dfc95b8..ba9c7f2 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -18,6 +18,7 @@
 // This is included before other files so that we can conditionally determine
 // what else to include.
 #include "catalog/CatalogConfig.h"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED
 #include "storage/StorageConfig.h"
 
 // Define feature test macros to enable large page support for mmap.
@@ -29,6 +30,10 @@
 
 #include "storage/StorageManager.hpp"
 
+#ifdef QUICKSTEP_DISTRIBUTED
+#include <grpc++/grpc++.h>
+#endif
+
 #if defined(QUICKSTEP_HAVE_MMAP_LINUX_HUGETLB) \
     || defined(QUICKSTEP_HAVE_MMAP_BSD_SUPERPAGE) \
     || defined(QUICKSTEP_HAVE_MMAP_PLAIN)
@@ -52,7 +57,21 @@
 #include <unordered_map>
 #include <vector>
 
+#include "catalog/CatalogTypedefs.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#endif
+
 #include "storage/CountedReference.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/DataExchange.grpc.pb.h"
+#include "storage/DataExchange.pb.h"
+#endif
+
 #include "storage/EvictionPolicy.hpp"
 #include "storage/FileManagerLocal.hpp"
 #include "storage/StorageBlob.hpp"
@@ -74,6 +93,13 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+#include "tmb/id_typedefs.h"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+#endif
+
 using std::free;
 using std::int32_t;
 using std::memset;
@@ -81,6 +107,15 @@ using std::size_t;
 using std::string;
 using std::vector;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+using std::malloc;
+using std::move;
+using std::unique_ptr;
+
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+#endif
+
 namespace quickstep {
 
 static bool ValidateBlockDomain(const char *flagname,
@@ -157,14 +192,21 @@ DEFINE_bool(use_hdfs, false, "Use HDFS as the persistent storage, instead of the
 #endif
 
 StorageManager::StorageManager(
-  const std::string &storage_path,
-  const block_id_domain block_domain,
-  const size_t max_memory_usage,
-  EvictionPolicy *eviction_policy)
+    const std::string &storage_path,
+    const block_id_domain block_domain,
+    const size_t max_memory_usage,
+    EvictionPolicy *eviction_policy,
+    const tmb::client_id block_locator_client_id,
+    tmb::MessageBus *bus)
     : storage_path_(storage_path),
       total_memory_usage_(0),
       max_memory_usage_(max_memory_usage),
-      eviction_policy_(eviction_policy) {
+      eviction_policy_(eviction_policy),
+#ifdef QUICKSTEP_DISTRIBUTED
+      block_domain_(block_domain),
+#endif
+      block_locator_client_id_(block_locator_client_id),
+      bus_(bus) {
 #ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
   if (FLAGS_use_hdfs) {
     file_manager_.reset(new FileManagerHdfs(storage_path));
@@ -175,10 +217,55 @@ StorageManager::StorageManager(
   file_manager_.reset(new FileManagerLocal(storage_path));
 #endif
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  // NOTE(zuyu): The following if-condition is a workaround to bypass code for
+  // the distributed version in some unittests that does not use TMB. The
+  // end-to-end functional tests for the distributed version, however, would not
+  // be affected.
+  if (bus_) {
+    storage_manager_client_id_ = bus_->Connect();
+
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
+    bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
+
+    LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+              << "') starts with Domain " << block_domain;
+  }
+#endif
+
   block_index_ = BlockIdUtil::GetBlockId(block_domain, file_manager_->getMaxUsedBlockCounter(block_domain));
 }
 
 StorageManager::~StorageManager() {
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    serialization::BlockDomainUnregistrationMessage proto;
+    proto.set_block_domain(block_domain_);
+
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kBlockDomainUnregistrationMessage);
+    free(proto_bytes);
+
+    LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+              << "') sent BlockDomainUnregistrationMessage (typed '" << kBlockDomainUnregistrationMessage
+              << "') to BlockLocator";
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           storage_manager_client_id_,
+                                           block_locator_client_id_,
+                                           move(message)));
+  }
+#endif
+
   for (std::unordered_map<block_id, BlockHandle>::iterator it = blocks_.begin();
        it != blocks_.end();
        ++it) {
@@ -221,6 +308,12 @@ block_id StorageManager::createBlock(const CatalogRelationSchema &relation,
   // Make '*eviction_policy_' aware of the new block's existence.
   eviction_policy_->blockCreated(new_block_id);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage);
+  }
+#endif
+
   return new_block_id;
 }
 
@@ -248,6 +341,12 @@ block_id StorageManager::createBlob(const std::size_t num_slots,
   // Make '*eviction_policy_' aware of the new blob's existence.
   eviction_policy_->blockCreated(new_block_id);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage);
+  }
+#endif
+
   return new_block_id;
 }
 
@@ -314,6 +413,12 @@ bool StorageManager::saveBlockOrBlob(const block_id block, const bool force) {
 }
 
 void StorageManager::evictBlockOrBlob(const block_id block) {
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    sendBlockLocationMessage(block, kDeleteBlockLocationMessage);
+  }
+#endif
+
   BlockHandle handle;
   {
     SpinSharedMutexExclusiveLock<false> write_lock(blocks_shared_mutex_);
@@ -361,11 +466,187 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
   return ++block_index_;
 }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+void StorageManager::pullBlockOrBlob(const block_id block,
+                                     PullResponse *response) const {
+  SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
+  std::unordered_map<block_id, BlockHandle>::const_iterator cit = blocks_.find(block);
+  if (cit != blocks_.end()) {
+    response->set_is_valid(true);
+
+    const BlockHandle &block_handle = cit->second;
+    const std::size_t num_slots = block_handle.block_memory_size;
+
+    response->set_num_slots(num_slots);
+    response->set_block(block_handle.block_memory,
+                        num_slots * kSlotSizeBytes);
+  } else {
+    response->set_is_valid(false);
+  }
+}
+
+StorageManager::DataExchangerClientAsync::DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
+                                                                   StorageManager *storage_manager)
+    : stub_(DataExchange::NewStub(channel)),
+      storage_manager_(storage_manager) {
+}
+
+bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
+                                                    const numa_node_id numa_node,
+                                                    BlockHandle *block_handle) {
+  grpc::ClientContext context;
+
+  PullRequest request;
+  request.set_block_id(block);
+
+  grpc::CompletionQueue queue;
+
+  unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc(
+      stub_->AsyncPull(&context, request, &queue));
+
+  PullResponse response;
+  grpc::Status status;
+
+  rpc->Finish(&response, &status, reinterpret_cast<void*>(1));
+
+  void *got_tag;
+  bool ok = false;
+
+  queue.Next(&got_tag, &ok);
+  CHECK(got_tag == reinterpret_cast<void*>(1));
+  CHECK(ok);
+
+  if (!status.ok()) {
+    LOG(ERROR) << "DataExchangerClientAsync Pull error: RPC failed";
+    return false;
+  }
+
+  if (!response.is_valid()) {
+    LOG(INFO) << "The pulling block not found in all the peers";
+    return false;
+  }
+
+  const size_t num_slots = response.num_slots();
+  DCHECK_NE(num_slots, 0u);
+
+  const string &block_content = response.block();
+  DCHECK_EQ(kSlotSizeBytes * num_slots, block_content.size());
+
+  void *block_buffer = storage_manager_->allocateSlots(num_slots, numa_node);
+
+  block_handle->block_memory =
+      std::memcpy(block_buffer, block_content.c_str(), block_content.size());
+  block_handle->block_memory_size = num_slots;
+
+  return true;
+}
+
+vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
+  serialization::GetPeerDomainNetworkAddressesMessage proto;
+  proto.set_block_id(block);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kGetPeerDomainNetworkAddressesMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+            << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
+            << "') to BlockLocator";
+
+  DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+  DCHECK(bus_ != nullptr);
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         storage_manager_client_id_,
+                                         block_locator_client_id_,
+                                         move(message)));
+
+  const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
+  const TaggedMessage &tagged_message = annotated_message.tagged_message;
+  CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
+
+  serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
+  CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+  vector<string> domain_network_addresses;
+  for (int i = 0; i < response_proto.domain_network_addresses_size(); ++i) {
+    domain_network_addresses.push_back(response_proto.domain_network_addresses(i));
+  }
+
+  return domain_network_addresses;
+}
+
+void StorageManager::sendBlockLocationMessage(const block_id block,
+                                              const tmb::message_type_id message_type) {
+  switch (message_type) {
+    case kAddBlockLocationMessage:
+      LOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+      break;
+    case kDeleteBlockLocationMessage:
+      LOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+      break;
+    default:
+      LOG(FATAL) << "Unknown message type " << message_type;
+  }
+
+  serialization::BlockLocationMessage proto;
+  proto.set_block_id(block);
+  proto.set_block_domain(block_domain_);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        message_type);
+  free(proto_bytes);
+
+  LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+            << "') sent BlockLocationMessage (typed '" << message_type
+            << "') to BlockLocator";
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         storage_manager_client_id_,
+                                         block_locator_client_id_,
+                                         move(message)));
+}
+#endif
+
 StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
     const block_id block, const int numa_node) {
   // The caller of this function holds an exclusive lock on this block/blob's
   // mutex in the lock manager. The caller has ensured that the block is not
   // already loaded before this function gets called.
+  BlockHandle loaded_handle;
+
+#ifdef QUICKSTEP_DISTRIBUTED
+  // TODO(quickstep-team): Use a cost model to determine whether to load from
+  // a remote peer or the disk.
+  if (BlockIdUtil::Domain(block) != block_domain_) {
+    LOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+    const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
+    for (const string &peer_domain_network_address : peer_domain_network_addresses) {
+      DataExchangerClientAsync client(
+          grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
+          this);
+
+      if (client.Pull(block, numa_node, &loaded_handle)) {
+        sendBlockLocationMessage(block, kAddBlockLocationMessage);
+        return loaded_handle;
+      }
+    }
+
+    LOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+              << " from remote peers, so try to load from disk.";
+  }
+#endif
+
   size_t num_slots = file_manager_->numSlots(block);
   DEBUG_ASSERT(num_slots != 0);
   void *block_buffer = allocateSlots(num_slots, numa_node);
@@ -373,10 +654,15 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
   CHECK(status) << "Failed to read block from persistent storage: " << block;
 
-  BlockHandle loaded_handle;
   loaded_handle.block_memory = block_buffer;
   loaded_handle.block_memory_size = num_slots;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  if (bus_) {
+    sendBlockLocationMessage(block, kAddBlockLocationMessage);
+  }
+#endif
+
   return loaded_handle;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 52326c2..7d0661e 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -20,12 +20,21 @@
 
 #include <atomic>
 #include <chrono>
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED
+
+#include "catalog/CatalogTypedefs.hpp"
 #include "storage/CountedReference.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/DataExchange.grpc.pb.h"
+#endif
+
 #include "storage/EvictionPolicy.hpp"
 #include "storage/FileManager.hpp"
 #include "storage/StorageBlob.hpp"
@@ -40,6 +49,14 @@
 #include "gflags/gflags.h"
 #include "gtest/gtest_prod.h"
 
+#include "tmb/id_typedefs.h"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+namespace grpc { class Channel; }
+#endif
+
+namespace tmb { class MessageBus; }
+
 namespace quickstep {
 
 DECLARE_int32(block_domain);
@@ -50,6 +67,11 @@ DECLARE_bool(use_hdfs);
 #endif
 
 class CatalogRelationSchema;
+
+#ifdef QUICKSTEP_DISTRIBUTED
+class PullResponse;
+#endif
+
 class StorageBlockLayout;
 
 /** \addtogroup Storage
@@ -104,6 +126,33 @@ class StorageManager {
                            std::chrono::milliseconds(200))) {
   }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Constructor.
+   * @param storage_path The filesystem directory where blocks have persistent
+   *        storage.
+   * @param block_domain The unique block domain.
+   * @param block_locator_client_id The TMB client ID of the block locator.
+   * @param bus A pointer to the TMB.
+   *
+   * @exception CorruptPersistentStorage The storage directory layout is not
+   *            in the expected format.
+   **/
+  StorageManager(const std::string &storage_path,
+                 const block_id_domain block_domain,
+                 const tmb::client_id block_locator_client_id,
+                 tmb::MessageBus *bus)
+      : StorageManager(storage_path,
+                       block_domain,
+                       FLAGS_buffer_pool_slots,
+                       LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
+                           2,
+                           std::chrono::milliseconds(200)),
+                       block_locator_client_id,
+                       bus) {
+  }
+#endif
+
   /**
    * @brief Constructor.
    * @param storage_path The filesystem directory where blocks have persistent
@@ -121,13 +170,17 @@ class StorageManager {
    * @param eviction_policy The eviction policy that the storage manager should
    *                        use to manage the cache. The storage manager takes
    *                        ownership of *eviction_policy.
+   * @param block_locator_client_id The TMB client ID of the block locator.
+   * @param bus A pointer to the TMB.
    * @exception CorruptPersistentStorage The storage directory layout is not
    *            in the expected format.
    **/
   StorageManager(const std::string &storage_path,
                  const block_id_domain block_domain,
                  const size_t max_memory_usage,
-                 EvictionPolicy *eviction_policy);
+                 EvictionPolicy *eviction_policy,
+                 const tmb::client_id block_locator_client_id = tmb::kClientIdNone,
+                 tmb::MessageBus *bus = nullptr);
 
   /**
    * @brief Destructor which also destroys all managed blocks.
@@ -325,6 +378,16 @@ class StorageManager {
    **/
   bool blockOrBlobIsLoadedAndDirty(const block_id block);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Pull a block or a blob.
+   *
+   * @param block The id of the block or blob.
+   * @param response Where to store the pulled block content.
+   **/
+  void pullBlockOrBlob(const block_id block, PullResponse *response) const;
+#endif
+
  private:
   struct BlockHandle {
     void *block_memory;
@@ -332,6 +395,63 @@ class StorageManager {
     StorageBlockBase *block;
   };
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief A class which connects to DataExchangerAsync to exchange data from
+   *        remote peers.
+   **/
+  class DataExchangerClientAsync {
+   public:
+    /**
+     * @brief Constructor.
+     *
+     * @param channel The RPC channel to connect DataExchangerAsync.
+     * @param storage_manager The StorageManager to use.
+     */
+    DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
+                             StorageManager *storage_manager);
+
+    /**
+     * @brief Pull a block or blob from a remote StorageManager.
+     *
+     * @param block The block or blob to pull.
+     * @param numa_node The NUMA node for placing this block.
+     * @param block_handle Where the pulled block or blob stores.
+     *
+     * @return Whether the pull operation is successful or not.
+     */
+    bool Pull(const block_id block,
+              const numa_node_id numa_node,
+              BlockHandle *block_handle);
+
+   private:
+    std::unique_ptr<DataExchange::Stub> stub_;
+
+    StorageManager *storage_manager_;
+
+    DISALLOW_COPY_AND_ASSIGN(DataExchangerClientAsync);
+  };
+
+  /**
+   * @brief Get the network info of all the remote StorageManagers which may
+   *        load the given block in the buffer pool.
+   *
+   * @param block The block or blob to pull.
+   *
+   * @return The network info of all the possible peers to pull.
+   **/
+  std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block);
+
+  /**
+   * @brief Update the block location info in BlockLocator.
+   *
+   * @param block The given block or blob.
+   * @param message_type Indicate whether to add or delete a block location.
+   **/
+  void sendBlockLocationMessage(const block_id block,
+                                const tmb::message_type_id message_type);
+#endif
+
   // Helper for createBlock() and createBlob(). Allocates a block ID and memory
   // slots for a new StorageBlock or StorageBlob. Returns the allocated ID and
   // writes the allocated slot range into 'handle->slot_index_low' and
@@ -459,6 +579,15 @@ class StorageManager {
 
   std::unique_ptr<EvictionPolicy> eviction_policy_;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  const block_id_domain block_domain_;
+
+  tmb::client_id storage_manager_client_id_;
+#endif
+
+  const tmb::client_id block_locator_client_id_;
+  tmb::MessageBus *bus_;
+
   std::unique_ptr<FileManager> file_manager_;
 
   // Used to generate unique IDs in allocateNewBlockOrBlob().

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
new file mode 100644
index 0000000..83309b1
--- /dev/null
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -0,0 +1,239 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/CountedReference.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/Tuple.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class DataExchangeTest : public ::testing::Test {
+ protected:
+  static const char kStoragePath[];
+  static const char kCheckedDomainNetworkAddress[];
+
+  ~DataExchangeTest() {
+    data_exchanger_expected_.join();
+    locator_->join();
+  }
+
+  virtual void SetUp() {
+    bus_.Initialize();
+
+    locator_.reset(new BlockLocator(&bus_));
+    locator_client_id_ = locator_->getBusClientID();
+    locator_->start();
+
+    worker_client_id_ = bus_.Connect();
+    bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
+    bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
+
+    bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
+
+    storage_manager_expected_.reset(new StorageManager(
+        kStoragePath,
+        getBlockDomain(data_exchanger_expected_.network_address()),
+        locator_client_id_,
+        &bus_));
+
+    data_exchanger_expected_.set_storage_manager(storage_manager_expected_.get());
+    data_exchanger_expected_.start();
+
+    storage_manager_checked_.reset(new StorageManager(
+        kStoragePath,
+        getBlockDomain(kCheckedDomainNetworkAddress),
+        locator_client_id_,
+        &bus_));
+  }
+
+  virtual void TearDown() {
+    storage_manager_checked_.reset();
+
+    data_exchanger_expected_.shutdown();
+    storage_manager_expected_.reset();
+
+    serialization::EmptyMessage proto;
+
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kPoisonMessage);
+    free(proto_bytes);
+
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') sent PoisonMessage (typed '" << kPoisonMessage
+              << "') to BlockLocator (id '" << locator_client_id_ << "')";
+    CHECK(tmb::MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           worker_client_id_,
+                                           locator_client_id_,
+                                           move(message)));
+  }
+
+  unique_ptr<StorageManager> storage_manager_expected_, storage_manager_checked_;
+
+ private:
+  block_id_domain getBlockDomain(const string &network_address) {
+    serialization::BlockDomainRegistrationMessage proto;
+    proto.set_domain_network_address(network_address);
+
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kBlockDomainRegistrationMessage);
+    free(proto_bytes);
+
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+              << "') to BlockLocator (id '" << locator_client_id_ << "')";
+
+    CHECK(tmb::MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           worker_client_id_,
+                                           locator_client_id_,
+                                           move(message)));
+
+    const tmb::AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    EXPECT_EQ(locator_client_id_, annotated_message.sender);
+    EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') received BlockDomainRegistrationResponseMessage (typed '"
+              << kBlockDomainRegistrationResponseMessage
+              << "') from BlockLocator";
+
+    quickstep::serialization::BlockDomainRegistrationResponseMessage response_proto;
+    CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+    return static_cast<block_id_domain>(response_proto.block_domain());
+  }
+
+  MessageBusImpl bus_;
+
+  unique_ptr<BlockLocator> locator_;
+  tmb::client_id locator_client_id_;
+
+  tmb::client_id worker_client_id_;
+
+  DataExchangerAsync data_exchanger_expected_;
+};
+
+const char DataExchangeTest::kStoragePath[] = "./data_exchange_test_data/";
+const char DataExchangeTest::kCheckedDomainNetworkAddress[] = "0.0.0.0:0";
+
+TEST_F(DataExchangeTest, BlockPull) {
+  CatalogRelation relation(nullptr, "rel");
+  const attribute_id attr_id =
+      relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt)));
+
+  const block_id block =
+      storage_manager_expected_->createBlock(relation, relation.getDefaultStorageBlockLayout());
+
+  {
+    MutableBlockReference block_expected = storage_manager_expected_->getBlockMutable(block, relation);
+
+    // Insert a tuple.
+    const int value_expected = -1;
+    {
+      vector<TypedValue> attrs(1, TypedValue(value_expected));
+      const Tuple tuple(move(attrs));
+
+      EXPECT_TRUE(block_expected->insertTuple(tuple));
+    }
+
+    const BlockReference block_checked = storage_manager_checked_->getBlock(block, relation);
+    EXPECT_FALSE(block_checked->isBlob());
+
+    const TupleStorageSubBlock &tuple_store_checked = block_checked->getTupleStorageSubBlock();
+
+    EXPECT_EQ(1, tuple_store_checked.numTuples());
+    EXPECT_EQ(value_expected, tuple_store_checked.getAttributeValueTyped(0 /* tuple_id */, attr_id).getLiteral<int>());
+  }
+
+  storage_manager_checked_->deleteBlockOrBlobFile(block);
+  storage_manager_expected_->deleteBlockOrBlobFile(block);
+}
+
+TEST_F(DataExchangeTest, BlobPull) {
+  const block_id blob = storage_manager_expected_->createBlob(kDefaultBlockSizeInSlots);
+  {
+    const BlobReference blob_expected = storage_manager_expected_->getBlob(blob);
+    const BlobReference blob_checked  =  storage_manager_checked_->getBlob(blob);
+    EXPECT_TRUE(blob_checked->isBlob());
+    EXPECT_EQ(blob, blob_checked->getID());
+  }
+
+  storage_manager_checked_->deleteBlockOrBlobFile(blob);
+  storage_manager_expected_->deleteBlockOrBlobFile(blob);
+}
+
+}  // namespace quickstep
+
+int main(int argc, char **argv) {
+  google::InitGoogleLogging(argv[0]);
+  // Honor FLAGS_buffer_pool_slots in StorageManager.
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  ::testing::InitGoogleTest(&argc, argv);
+
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/third_party/iwyu/iwyu_helper.py
----------------------------------------------------------------------
diff --git a/third_party/iwyu/iwyu_helper.py b/third_party/iwyu/iwyu_helper.py
index a204c50..1d3a76f 100755
--- a/third_party/iwyu/iwyu_helper.py
+++ b/third_party/iwyu/iwyu_helper.py
@@ -21,6 +21,7 @@ QUICKSTEP_INCLUDES = [ '.',
                        './build/third_party',
                        './build/third_party/protobuf/include',
                        './build/third_party/gflags/include',
+                       './build/third_party/tmb/include',
                        './third_party/benchmark/include',
                        './third_party/glog/src',
                        './third_party/googletest/googletest/include',

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3a46073d/validate_cmakelists.py
----------------------------------------------------------------------
diff --git a/validate_cmakelists.py b/validate_cmakelists.py
index c7b5883..7dd6fc5 100755
--- a/validate_cmakelists.py
+++ b/validate_cmakelists.py
@@ -17,7 +17,7 @@ TODO List / Known Issues & Limitations:
 """
 
 #   Copyright 2011-2015 Quickstep Technologies LLC.
-#   Copyright 2015 Pivotal Software, Inc.
+#   Copyright 2015-2016 Pivotal Software, Inc.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -40,7 +40,8 @@ EXCLUDED_TOP_LEVEL_DIRS = ["build", "third_party"]
 # Explicitly ignored dependencies (special headers with no other quickstep
 # dependencies).
 IGNORED_DEPENDENCIES = frozenset(
-    ["quickstep_threading_WinThreadsAPI",
+    ["quickstep_storage_DataExchange.grpc_proto",
+     "quickstep_threading_WinThreadsAPI",
      "quickstep_utility_textbasedtest_TextBasedTest",
      "quickstep_utility_textbasedtest_TextBasedTestDriver",
      "quickstep_storage_bitweaving_BitWeavingHIndexSubBlock",