You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/29 05:54:42 UTC

incubator-quickstep git commit: Added Async DataExchange Service.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/async-dex [created] 492ffa1e8


Added Async DataExchange Service.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/492ffa1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/492ffa1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/492ffa1e

Branch: refs/heads/async-dex
Commit: 492ffa1e868d489d81bb1489ad8605f962d312ed
Parents: 6dcb0fc
Author: Zuyu Zhang <zz...@pivotal.io>
Authored: Sat May 28 22:55:05 2016 -0700
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Sat May 28 22:55:22 2016 -0700

----------------------------------------------------------------------
 CMakeLists.txt                          |  11 +-
 storage/CMakeLists.txt                  |  69 +++++++-
 storage/DataExchange.proto              |  31 ++++
 storage/DataExchangerAsync.cpp          | 165 ++++++++++++++++++
 storage/DataExchangerAsync.hpp          |  97 +++++++++++
 storage/StorageManager.cpp              | 113 ++++++++++++-
 storage/StorageManager.hpp              |  61 ++++++-
 storage/tests/DataExchange_unittest.cpp | 240 +++++++++++++++++++++++++++
 third_party/iwyu/iwyu_helper.py         |   3 +-
 validate_cmakelists.py                  |   5 +-
 10 files changed, 787 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 87a8f7c..d0fd975 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -659,7 +659,12 @@ set(ENABLE_PUREMEMORY ON CACHE BOOL "Enable PureMemory TMB")
 set(ENABLE_LEVELDB OFF CACHE BOOL "Enable LevelDB TMB")
 set(ENABLE_MEMORYMIRROR OFF CACHE BOOL "Enable MemoryMirror TMB")
 set(ENABLE_NATIVELOG OFF CACHE BOOL "Enable NativeLog TMB")
-set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
+
+# The distributed version requires to use the NativeNet implementation.
+if (NOT ENABLE_DISTRIBUTED)
+  set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB")
+endif()
+
 set(ENABLE ENABLE_SQLITE OFF CACHE BOOL "Enable SQLite TMB")
 set(ENABLE_VOLTDB OFF CACHE BOOL "Enable VoltDB TMB")
 set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
@@ -667,6 +672,10 @@ set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB")
 add_subdirectory("${THIRD_PARTY_SOURCE_DIR}/tmb" "${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb")
 include_directories(${TMB_INCLUDE_DIRS})
 
+if (ENABLE_DISTRIBUTED)
+  include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb/include)
+endif()
+
 # Add all of the module subdirectories. CMakeLists.txt in each of the subdirectories
 # defines how to build that module's libraries.
 add_subdirectory(catalog)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 2efc1ae..6bcbd41 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -126,6 +126,13 @@ QS_PROTOBUF_GENERATE_CPP(storage_StorageBlockLayout_proto_srcs
                          storage_StorageBlockLayout_proto_hdrs
                          StorageBlockLayout.proto)
 
+if (ENABLE_DISTRIBUTED)
+  GRPC_GENERATE_CPP(storage_DataExchange_proto_srcs
+                    storage_DataExchange_proto_hdrs
+                    .
+                    DataExchange.proto)
+endif()
+
 # Declare micro-libs:
 add_library(quickstep_storage_AggregationOperationState
             AggregationOperationState.cpp
@@ -171,6 +178,14 @@ add_library(quickstep_storage_CompressedTupleStorageSubBlock
             CompressedTupleStorageSubBlock.hpp)
 add_library(quickstep_storage_CountedReference ../empty_src.cpp CountedReference.hpp)
 add_library(quickstep_storage_CSBTreeIndexSubBlock CSBTreeIndexSubBlock.cpp CSBTreeIndexSubBlock.hpp)
+
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_storage_DataExchange_proto
+              ${storage_DataExchange_proto_srcs}
+              ${storage_DataExchange_proto_hdrs})
+  add_library(quickstep_storage_DataExchangerAsync DataExchangerAsync.cpp DataExchangerAsync.hpp)
+endif()
+
 add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
 add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
@@ -573,6 +588,19 @@ target_link_libraries(quickstep_storage_CSBTreeIndexSubBlock
                       quickstep_utility_Macros
                       quickstep_utility_PtrVector
                       quickstep_utility_ScopedBuffer)
+
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_storage_DataExchange_proto
+                        ${PROTOBUF3_LIBRARY})
+  target_link_libraries(quickstep_storage_DataExchangerAsync
+                        glog
+                        quickstep_storage_DataExchange_proto
+                        quickstep_storage_StorageManager
+                        quickstep_threading_Thread
+                        quickstep_utility_Macros
+                        ${GRPCPLUSPLUS_LIBRARIES})
+endif()
+
 target_link_libraries(quickstep_storage_EvictionPolicy
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageConstants
@@ -913,6 +941,7 @@ target_link_libraries(quickstep_storage_StorageManager
                       gflags_nothreads-static
                       glog
                       gtest
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_storage_CountedReference
                       quickstep_storage_EvictionPolicy
                       quickstep_storage_FileManager
@@ -943,7 +972,9 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_storage_StorageManager
                         quickstep_queryexecution_QueryExecutionMessages_proto
                         quickstep_queryexecution_QueryExecutionTypedefs
-                        quickstep_queryexecution_QueryExecutionUtil)
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_storage_DataExchange_proto
+                        ${GRPCPLUSPLUS_LIBRARIES})
 endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_storage_SubBlockTypeRegistry
                       glog
@@ -1058,6 +1089,11 @@ elseif (QUICKSTEP_HAVE_FILE_MANAGER_WINDOWS)
   target_link_libraries(quickstep_storage
                         quickstep_storage_FileManagerWindows)
 endif()
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_storage
+                        quickstep_storage_DataExchange_proto
+                        quickstep_storage_DataExchangerAsync)
+endif()
 # CMAKE_VALIDATE_IGNORE_BEGIN
 if(QUICKSTEP_HAVE_BITWEAVING)
   target_link_libraries(quickstep_storage
@@ -1327,6 +1363,37 @@ target_link_libraries(CompressedPackedRowStoreTupleStorageSubBlock_unittest
                       ${LIBS})
 add_test(CompressedPackedRowStoreTupleStorageSubBlock_unittest CompressedPackedRowStoreTupleStorageSubBlock_unittest)
 
+if (ENABLE_DISTRIBUTED)
+  add_executable(DataExchange_unittest
+                 "${CMAKE_CURRENT_SOURCE_DIR}/tests/DataExchange_unittest.cpp")
+  target_link_libraries(DataExchange_unittest
+                        gflags_nothreads-static
+                        glog
+                        gtest
+                        quickstep_catalog_CatalogAttribute
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_storage_CountedReference
+                        quickstep_storage_DataExchangerAsync
+                        quickstep_storage_StorageBlob
+                        quickstep_storage_StorageBlock
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageConstants
+                        quickstep_storage_StorageManager
+                        quickstep_storage_TupleStorageSubBlock
+                        quickstep_types_TypeFactory
+                        quickstep_types_TypeID
+                        quickstep_types_TypedValue
+                        quickstep_types_containers_Tuple
+                        tmb
+                        ${LIBS})
+  add_test(DataExchange_unittest DataExchange_unittest)
+endif(ENABLE_DISTRIBUTED)
+
 add_executable(CSBTreeIndexSubBlock_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/CSBTreeIndexSubBlock_unittest.cpp")
 target_link_libraries(CSBTreeIndexSubBlock_unittest
                       gtest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/storage/DataExchange.proto
----------------------------------------------------------------------
diff --git a/storage/DataExchange.proto b/storage/DataExchange.proto
new file mode 100644
index 0000000..a2636e5
--- /dev/null
+++ b/storage/DataExchange.proto
@@ -0,0 +1,31 @@
+//   Copyright 2016 Pivotal Software, Inc.
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+
+syntax = "proto3";
+
+package quickstep;
+
+service DataExchange {
+  rpc Pull (PullRequest) returns (PullResponse) {}
+}
+
+message PullRequest {
+  fixed64 block_id = 1;
+}
+
+message PullResponse {
+  bool is_valid = 1;
+  uint64 num_slots = 2;
+  bytes block = 3;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/storage/DataExchangerAsync.cpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.cpp b/storage/DataExchangerAsync.cpp
new file mode 100644
index 0000000..78c6565
--- /dev/null
+++ b/storage/DataExchangerAsync.cpp
@@ -0,0 +1,165 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "storage/DataExchangerAsync.hpp"
+
+#include <grpc++/grpc++.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include "storage/DataExchange.grpc.pb.h"
+#include "storage/DataExchange.pb.h"
+#include "storage/StorageManager.hpp"
+
+#include "glog/logging.h"
+
+using grpc::ServerCompletionQueue;
+
+namespace quickstep {
+namespace {
+
+/**
+ * @brief RPC Request Context Instance.
+ **/
+class CallContext {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param service The async service.
+   * @param queue The RPC request queue.
+   * @param storage_manager The StorageManager to use.
+   **/
+  CallContext(DataExchange::AsyncService *service,
+              ServerCompletionQueue *queue,
+              StorageManager *storage_manager)
+      : service_(service),
+        queue_(queue),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        responder_(&context_),
+        status_(CallStatus::CREATE) {
+    Proceed();
+  }
+
+  /**
+   * @brief Process the RPC request.
+   **/
+  void Proceed();
+
+ private:
+  DataExchange::AsyncService *service_;
+  ServerCompletionQueue *queue_;
+
+  StorageManager *storage_manager_;
+
+  grpc::ServerContext context_;
+
+  PullRequest request_;
+  PullResponse response_;
+
+  grpc::ServerAsyncResponseWriter<PullResponse> responder_;
+
+  enum class CallStatus {
+    CREATE = 0,
+    PROCESS,
+    FINISH,
+  };
+  CallStatus status_;
+};
+
+void CallContext::Proceed() {
+  switch (status_) {
+    case CallStatus::CREATE: {
+      // Change this instance progress to the PROCESS state.
+      status_ = CallStatus::PROCESS;
+
+      // As part of the initial CREATE state, we *request* that the system
+      // start processing Pull requests. In this request, "this" acts are
+      // the tag uniquely identifying the request (so that different CallContext
+      // instances can serve different requests concurrently), in this case
+      // the memory address of this CallContext instance.
+      service_->RequestPull(&context_, &request_, &responder_, queue_, queue_, this);
+      break;
+    }
+    case CallStatus::PROCESS: {
+      // Spawn a new CallContext instance to serve new clients while we process
+      // the one for this CallContext. The instance will deallocate itself as
+      // part of its FINISH state.
+      new CallContext(service_, queue_, storage_manager_);
+
+      // The actual processing.
+      storage_manager_->pullBlockOrBlob(request_.block_id(), &response_);
+
+      // And we are done! Let the gRPC runtime know we've finished, using the
+      // memory address of this instance as the uniquely identifying tag for
+      // the event.
+      status_ = CallStatus::FINISH;
+      responder_.Finish(response_, grpc::Status::OK, this);
+      break;
+    }
+    case CallStatus::FINISH: {
+      // Once in the FINISH state, deallocate ourselves (CallContext).
+      delete this;
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown call status.";
+  }
+}
+
+}  // namespace
+
+const char *DataExchangerAsync::kLocalNetworkAddress = "0.0.0.0:";
+
+DataExchangerAsync::DataExchangerAsync() {
+  grpc::ServerBuilder builder;
+  builder.AddListeningPort(kLocalNetworkAddress, grpc::InsecureServerCredentials(), &port_);
+  builder.RegisterService(&service_);
+
+  queue_ = builder.AddCompletionQueue();
+  server_ = builder.BuildAndStart();
+
+  DCHECK_GT(port_, 0);
+  server_address_ = kLocalNetworkAddress + std::to_string(port_);
+  LOG(INFO) << "DataExchangerAsync Service listening on " << server_address_;
+}
+
+void DataExchangerAsync::run() {
+  // Self-destruct upon success.
+  new CallContext(&service_, queue_.get(), storage_manager_);
+
+  void *tag = nullptr;  // Uniquely identify a request.
+  bool ok = false;
+
+  while (true) {
+    if (queue_->Next(&tag, &ok)) {
+      CallContext *call_context = static_cast<CallContext*>(tag);
+      if (ok) {
+        call_context->Proceed();
+      } else {
+        LOG(WARNING) << "Not ok\n";
+        delete call_context;
+      }
+    } else {
+      LOG(INFO) << "Shutdown\n";
+      return;
+    }
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/storage/DataExchangerAsync.hpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.hpp b/storage/DataExchangerAsync.hpp
new file mode 100644
index 0000000..75a4e4d
--- /dev/null
+++ b/storage/DataExchangerAsync.hpp
@@ -0,0 +1,97 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
+#define QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_
+
+#include <grpc++/grpc++.h>
+
+#include <memory>
+#include <string>
+
+#include "storage/DataExchange.grpc.pb.h"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+
+/**
+ * @brief A class which exchanges data from a StorageManager to its peer.
+ **/
+class DataExchangerAsync final : public Thread {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  DataExchangerAsync();
+
+  ~DataExchangerAsync() override {}
+
+  /**
+   * @brief Set the local StorageManager.
+   *
+   * @param storage_manager The StorageManager to use.
+   **/
+  void set_storage_manager(StorageManager *storage_manager) {
+    storage_manager_ = storage_manager;
+  }
+
+  /**
+   * @brief Return its network address for peers to connect.
+   *
+   * @return Its network address.
+   **/
+  std::string network_address() const {
+    DCHECK(!server_address_.empty());
+    return server_address_;
+  }
+
+  /**
+   * @brief Shutdown itself.
+   **/
+  void shutdown() {
+    server_->Shutdown();
+    // Always shutdown the completion queue after the server.
+    queue_->Shutdown();
+  }
+
+ protected:
+  void run() override;
+
+ private:
+  static const char *kLocalNetworkAddress;
+
+  DataExchange::AsyncService service_;
+
+  int port_ = -1;
+  // Format IP:port, i.e., "0.0.0.0:0".
+  std::string server_address_;
+
+  std::unique_ptr<grpc::ServerCompletionQueue> queue_;
+  std::unique_ptr<grpc::Server> server_;
+
+  StorageManager *storage_manager_ = nullptr;
+
+  DISALLOW_COPY_AND_ASSIGN(DataExchangerAsync);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 99ce96d..2d7bfbc 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -41,6 +41,10 @@
 #include <numaif.h>
 #endif
 
+#ifdef QUICKSTEP_DISTRIBUTED
+#include <grpc++/grpc++.h>
+#endif
+
 #include <atomic>
 #include <cerrno>
 #include <cstddef>
@@ -53,6 +57,8 @@
 #include <unordered_map>
 #include <vector>
 
+#include "catalog/CatalogTypedefs.hpp"
+
 #ifdef QUICKSTEP_DISTRIBUTED
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
@@ -60,6 +66,12 @@
 #endif
 
 #include "storage/CountedReference.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/DataExchange.grpc.pb.h"
+#include "storage/DataExchange.pb.h"
+#endif
+
 #include "storage/EvictionPolicy.hpp"
 #include "storage/FileManagerLocal.hpp"
 #include "storage/StorageBlob.hpp"
@@ -455,6 +467,80 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
 }
 
 #ifdef QUICKSTEP_DISTRIBUTED
+void StorageManager::pullBlockOrBlob(const block_id block,
+                                     PullResponse *response) const {
+  SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
+  std::unordered_map<block_id, BlockHandle>::const_iterator cit = blocks_.find(block);
+  if (cit != blocks_.end()) {
+    response->set_is_valid(true);
+
+    const BlockHandle &block_handle = cit->second;
+    const std::size_t num_slots = block_handle.block_memory_size;
+
+    response->set_num_slots(num_slots);
+    response->set_block(block_handle.block_memory,
+                        num_slots * kSlotSizeBytes);
+  } else {
+    response->set_is_valid(false);
+  }
+}
+
+StorageManager::DataExchangerClientAsync::DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
+                                                                   StorageManager *storage_manager)
+    : stub_(DataExchange::NewStub(channel)),
+      storage_manager_(storage_manager) {
+}
+
+bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
+                                                    const numa_node_id numa_node,
+                                                    BlockHandle *block_handle) {
+  grpc::ClientContext context;
+
+  PullRequest request;
+  request.set_block_id(block);
+
+  grpc::CompletionQueue queue;
+
+  unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc(
+      stub_->AsyncPull(&context, request, &queue));
+
+  PullResponse response;
+  grpc::Status status;
+
+  rpc->Finish(&response, &status, reinterpret_cast<void*>(1));
+
+  void *got_tag;
+  bool ok = false;
+
+  queue.Next(&got_tag, &ok);
+  CHECK(got_tag == reinterpret_cast<void*>(1));
+  CHECK(ok);
+
+  if (!status.ok()) {
+    LOG(ERROR) << "DataExchangerClientAsync Pull error: RPC failed";
+    return false;
+  }
+
+  if (!response.is_valid()) {
+    LOG(INFO) << "The pulling block not found in all the peers";
+    return false;
+  }
+
+  const size_t num_slots = response.num_slots();
+  DCHECK_NE(num_slots, 0u);
+
+  const string &block_content = response.block();
+  DCHECK_EQ(kSlotSizeBytes * num_slots, block_content.size());
+
+  void *block_buffer = storage_manager_->allocateSlots(num_slots, numa_node);
+
+  block_handle->block_memory =
+      std::memcpy(block_buffer, block_content.c_str(), block_content.size());
+  block_handle->block_memory_size = num_slots;
+
+  return true;
+}
+
 vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
   serialization::BlockMessage proto;
   proto.set_block_id(block);
@@ -540,14 +626,37 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   // The caller of this function holds an exclusive lock on this block/blob's
   // mutex in the lock manager. The caller has ensured that the block is not
   // already loaded before this function gets called.
-  size_t num_slots = file_manager_->numSlots(block);
+  BlockHandle loaded_handle;
+
+#ifdef QUICKSTEP_DISTRIBUTED
+  // TODO(quickstep-team): Use a cost model to determine whether to load from
+  // a remote peer or the disk.
+  if (BlockIdUtil::Domain(block) != block_domain_) {
+    LOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+    const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
+    for (const string &peer_domain_network_address : peer_domain_network_addresses) {
+      DataExchangerClientAsync client(
+          grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
+          this);
+
+      if (client.Pull(block, numa_node, &loaded_handle)) {
+        sendBlockLocationMessage(block, kAddBlockLocationMessage);
+        return loaded_handle;
+      }
+    }
+
+    LOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+              << " from remote peers, so try to load from disk.";
+  }
+#endif
+
+  const size_t num_slots = file_manager_->numSlots(block);
   DEBUG_ASSERT(num_slots != 0);
   void *block_buffer = allocateSlots(num_slots, numa_node);
 
   const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
   CHECK(status) << "Failed to read block from persistent storage: " << block;
 
-  BlockHandle loaded_handle;
   loaded_handle.block_memory = block_buffer;
   loaded_handle.block_memory_size = num_slots;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 55a011e..50ddb0f 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -26,9 +26,14 @@
 #include <unordered_map>
 #include <vector>
 
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED
-
 #include "storage/CountedReference.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/DataExchange.grpc.pb.h"
+#endif
+
 #include "storage/EvictionPolicy.hpp"
 #include "storage/FileManager.hpp"
 #include "storage/StorageBlob.hpp"
@@ -45,6 +50,10 @@
 
 #include "tmb/id_typedefs.h"
 
+#ifdef QUICKSTEP_DISTRIBUTED
+namespace grpc { class Channel; }
+#endif
+
 namespace tmb { class MessageBus; }
 
 namespace quickstep {
@@ -58,6 +67,10 @@ DECLARE_bool(use_hdfs);
 
 class CatalogRelationSchema;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+class PullResponse;
+#endif
+
 class StorageBlockLayout;
 
 /** \addtogroup Storage
@@ -365,6 +378,16 @@ class StorageManager {
    **/
   bool blockOrBlobIsLoadedAndDirty(const block_id block);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Pull a block or a blob. Used by DataExchangerAsync.
+   *
+   * @param block The id of the block or blob.
+   * @param response Where to store the pulled block content.
+   **/
+  void pullBlockOrBlob(const block_id block, PullResponse *response) const;
+#endif
+
  private:
   struct BlockHandle {
     void *block_memory;
@@ -374,6 +397,42 @@ class StorageManager {
 
 #ifdef QUICKSTEP_DISTRIBUTED
   /**
+   * @brief A class which connects to DataExchangerAsync to exchange data from
+   *        remote peers.
+   **/
+  class DataExchangerClientAsync {
+   public:
+    /**
+     * @brief Constructor.
+     *
+     * @param channel The RPC channel to connect DataExchangerAsync.
+     * @param storage_manager The StorageManager to use.
+     */
+    DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
+                             StorageManager *storage_manager);
+
+    /**
+     * @brief Pull a block or blob from a remote StorageManager.
+     *
+     * @param block The block or blob to pull.
+     * @param numa_node The NUMA node for placing this block.
+     * @param block_handle Where the pulled block or blob stores.
+     *
+     * @return Whether the pull operation is successful or not.
+     */
+    bool Pull(const block_id block,
+              const numa_node_id numa_node,
+              BlockHandle *block_handle);
+
+   private:
+    std::unique_ptr<DataExchange::Stub> stub_;
+
+    StorageManager *storage_manager_;
+
+    DISALLOW_COPY_AND_ASSIGN(DataExchangerClientAsync);
+  };
+
+  /**
    * @brief Get the network info of all the remote StorageManagers which may
    *        load the given block in the buffer pool.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
new file mode 100644
index 0000000..38d12f6
--- /dev/null
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -0,0 +1,240 @@
+/**
+ *   Copyright 2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/CountedReference.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/Tuple.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class DataExchangeTest : public ::testing::Test {
+ protected:
+  static const char kStoragePath[];
+  static const char kCheckedDomainNetworkAddress[];
+
+  ~DataExchangeTest() {
+    data_exchanger_expected_.join();
+    locator_->join();
+  }
+
+  virtual void SetUp() {
+    bus_.Initialize();
+
+    locator_.reset(new BlockLocator(&bus_));
+    locator_client_id_ = locator_->getBusClientID();
+    locator_->start();
+
+    worker_client_id_ = bus_.Connect();
+    bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
+    bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
+
+    bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
+
+    storage_manager_expected_.reset(new StorageManager(
+        kStoragePath,
+        getBlockDomain(data_exchanger_expected_.network_address()),
+        locator_client_id_,
+        &bus_));
+
+    data_exchanger_expected_.set_storage_manager(storage_manager_expected_.get());
+    data_exchanger_expected_.start();
+
+    storage_manager_checked_.reset(new StorageManager(
+        kStoragePath,
+        getBlockDomain(kCheckedDomainNetworkAddress),
+        locator_client_id_,
+        &bus_));
+  }
+
+  virtual void TearDown() {
+    storage_manager_checked_.reset();
+
+    data_exchanger_expected_.shutdown();
+    storage_manager_expected_.reset();
+
+    serialization::EmptyMessage proto;
+
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kPoisonMessage);
+    free(proto_bytes);
+
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') sent PoisonMessage (typed '" << kPoisonMessage
+              << "') to BlockLocator (id '" << locator_client_id_ << "')";
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           worker_client_id_,
+                                           locator_client_id_,
+                                           move(message)));
+  }
+
+  unique_ptr<StorageManager> storage_manager_expected_, storage_manager_checked_;
+
+ private:
+  block_id_domain getBlockDomain(const string &network_address) {
+    serialization::BlockDomainRegistrationMessage proto;
+    proto.set_domain_network_address(network_address);
+
+    const int proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kBlockDomainRegistrationMessage);
+    free(proto_bytes);
+
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+              << "') to BlockLocator (id '" << locator_client_id_ << "')";
+
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           worker_client_id_,
+                                           locator_client_id_,
+                                           move(message)));
+
+    const tmb::AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    EXPECT_EQ(locator_client_id_, annotated_message.sender);
+    EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+
+    LOG(INFO) << "Worker (id '" << worker_client_id_
+              << "') received BlockDomainRegistrationResponseMessage (typed '"
+              << kBlockDomainRegistrationResponseMessage
+              << "') from BlockLocator";
+
+    serialization::BlockDomainMessage response_proto;
+    CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+    return static_cast<block_id_domain>(response_proto.block_domain());
+  }
+
+  MessageBusImpl bus_;
+
+  unique_ptr<BlockLocator> locator_;
+  tmb::client_id locator_client_id_;
+
+  tmb::client_id worker_client_id_;
+
+  DataExchangerAsync data_exchanger_expected_;
+};
+
+const char DataExchangeTest::kStoragePath[] = "./data_exchange_test_data/";
+const char DataExchangeTest::kCheckedDomainNetworkAddress[] = "ip:port";
+
+TEST_F(DataExchangeTest, BlockPull) {
+  CatalogRelation relation(nullptr, "rel");
+  const attribute_id attr_id =
+      relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt)));
+
+  const block_id block =
+      storage_manager_expected_->createBlock(relation, relation.getDefaultStorageBlockLayout());
+
+  {
+    MutableBlockReference block_expected = storage_manager_expected_->getBlockMutable(block, relation);
+
+    // Insert a tuple.
+    const int value_expected = -1;
+    {
+      vector<TypedValue> attrs(1, TypedValue(value_expected));
+      const Tuple tuple(move(attrs));
+
+      EXPECT_TRUE(block_expected->insertTuple(tuple));
+    }
+
+    const BlockReference block_checked = storage_manager_checked_->getBlock(block, relation);
+    EXPECT_FALSE(block_checked->isBlob());
+
+    const TupleStorageSubBlock &tuple_store_checked = block_checked->getTupleStorageSubBlock();
+
+    EXPECT_EQ(1, tuple_store_checked.numTuples());
+    EXPECT_EQ(value_expected, tuple_store_checked.getAttributeValueTyped(0 /* tuple_id */, attr_id).getLiteral<int>());
+  }
+
+  storage_manager_checked_->deleteBlockOrBlobFile(block);
+  storage_manager_expected_->deleteBlockOrBlobFile(block);
+}
+
+TEST_F(DataExchangeTest, BlobPull) {
+  const block_id blob = storage_manager_expected_->createBlob(kDefaultBlockSizeInSlots);
+  {
+    const BlobReference blob_expected = storage_manager_expected_->getBlob(blob);
+    const BlobReference blob_checked  =  storage_manager_checked_->getBlob(blob);
+    EXPECT_TRUE(blob_checked->isBlob());
+    EXPECT_EQ(blob, blob_checked->getID());
+  }
+
+  storage_manager_checked_->deleteBlockOrBlobFile(blob);
+  storage_manager_expected_->deleteBlockOrBlobFile(blob);
+}
+
+}  // namespace quickstep
+
+int main(int argc, char **argv) {
+  google::InitGoogleLogging(argv[0]);
+  // Honor FLAGS_buffer_pool_slots in StorageManager.
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  ::testing::InitGoogleTest(&argc, argv);
+
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/third_party/iwyu/iwyu_helper.py
----------------------------------------------------------------------
diff --git a/third_party/iwyu/iwyu_helper.py b/third_party/iwyu/iwyu_helper.py
index a204c50..13697be 100755
--- a/third_party/iwyu/iwyu_helper.py
+++ b/third_party/iwyu/iwyu_helper.py
@@ -19,8 +19,9 @@ import sys
 QUICKSTEP_INCLUDES = [ '.',
                        './build',
                        './build/third_party',
-                       './build/third_party/protobuf/include',
                        './build/third_party/gflags/include',
+                       './build/third_party/protobuf/include',
+                       './build/third_party/tmb/include',
                        './third_party/benchmark/include',
                        './third_party/glog/src',
                        './third_party/googletest/googletest/include',

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/492ffa1e/validate_cmakelists.py
----------------------------------------------------------------------
diff --git a/validate_cmakelists.py b/validate_cmakelists.py
index c7b5883..7dd6fc5 100755
--- a/validate_cmakelists.py
+++ b/validate_cmakelists.py
@@ -17,7 +17,7 @@ TODO List / Known Issues & Limitations:
 """
 
 #   Copyright 2011-2015 Quickstep Technologies LLC.
-#   Copyright 2015 Pivotal Software, Inc.
+#   Copyright 2015-2016 Pivotal Software, Inc.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -40,7 +40,8 @@ EXCLUDED_TOP_LEVEL_DIRS = ["build", "third_party"]
 # Explicitly ignored dependencies (special headers with no other quickstep
 # dependencies).
 IGNORED_DEPENDENCIES = frozenset(
-    ["quickstep_threading_WinThreadsAPI",
+    ["quickstep_storage_DataExchange.grpc_proto",
+     "quickstep_threading_WinThreadsAPI",
      "quickstep_utility_textbasedtest_TextBasedTest",
      "quickstep_utility_textbasedtest_TextBasedTestDriver",
      "quickstep_storage_bitweaving_BitWeavingHIndexSubBlock",