You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/15 09:39:17 UTC

incubator-quickstep git commit: Refactored the data exchange process.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/refactor-data-exchange [created] 9fae916de


Refactored the data exchange process.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9fae916d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9fae916d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9fae916d

Branch: refs/heads/refactor-data-exchange
Commit: 9fae916de2a5efe97d1050d99340f59c5028df63
Parents: 256f9dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Mar 15 02:39:02 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 15 02:39:02 2017 -0700

----------------------------------------------------------------------
 query_execution/BlockLocator.cpp             | 28 +++++++
 query_execution/BlockLocator.hpp             |  4 +
 query_execution/QueryExecutionMessages.proto |  4 +
 query_execution/QueryExecutionTypedefs.hpp   |  2 +
 query_execution/QueryExecutionUtil.hpp       |  2 +
 storage/StorageManager.cpp                   | 99 +++++++++++++++++------
 storage/StorageManager.hpp                   | 13 +++
 7 files changed, 127 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 765021e..94a0263 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -137,6 +137,10 @@ void BlockLocator::run() {
         processLocateBlockMessage(sender, proto.block_id());
         break;
       }
+      case kGetAllDomainNetworkAddressesMessage: {
+        processGetAllDomainNetworkAddressesMessage(sender);
+        break;
+      }
       case kGetPeerDomainNetworkAddressesMessage: {
         serialization::BlockMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -228,6 +232,30 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver,
                                          move(message)));
 }
 
+void BlockLocator::processGetAllDomainNetworkAddressesMessage(const client_id receiver) {
+  serialization::GetAllDomainNetworkAddressesResponseMessage proto;
+
+  // NOTE(zuyu): We don't need to protect here, as all the writers are in the
+  // single thread.
+  for (const auto &domain_network_address_pair : domain_network_addresses_) {
+    (*proto.mutable_domain_network_addresses())[domain_network_address_pair.first] =
+        domain_network_address_pair.second;
+  }
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length,
+                        kGetAllDomainNetworkAddressesResponseMessage);
+  free(proto_bytes);
+
+  DLOG(INFO) << "BlockLocator with Client " << locator_client_id_
+             << " sent GetAllDomainNetworkAddressesResponseMessage to StorageManager with Client " << receiver;
+  CHECK(tmb::MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(bus_, locator_client_id_, receiver, move(message)));
+}
+
 void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
                                                                const block_id block) {
   serialization::GetPeerDomainNetworkAddressesResponseMessage proto;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index 4690369..fd4b9ee 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -80,6 +80,9 @@ class BlockLocator : public Thread {
     bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
     bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
 
+    bus_->RegisterClientAsReceiver(locator_client_id_, kGetAllDomainNetworkAddressesMessage);
+    bus_->RegisterClientAsSender(locator_client_id_, kGetAllDomainNetworkAddressesResponseMessage);
+
     bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage);
     bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage);
   }
@@ -141,6 +144,7 @@ class BlockLocator : public Thread {
  private:
   void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
   void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
+  void processGetAllDomainNetworkAddressesMessage(const tmb::client_id receiver);
   void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
 
   tmb::MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e8f102a..60c4be8 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -171,3 +171,7 @@ message LocateBlockResponseMessage {
 message GetPeerDomainNetworkAddressesResponseMessage {
   repeated string domain_network_addresses = 1;
 }
+
+message GetAllDomainNetworkAddressesResponseMessage {
+  map<uint32, string> domain_network_addresses = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index afdac92..7c2cd1b 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -119,6 +119,8 @@ enum QueryExecutionMessageType : message_type_id {
   kLocateBlockResponseMessage,  // From BlockLocator to StorageManager.
   kGetPeerDomainNetworkAddressesMessage,  // From StorageManager to BlockLocator.
   kGetPeerDomainNetworkAddressesResponseMessage,  // From BlockLocator to StorageManager.
+  kGetAllDomainNetworkAddressesMessage,  // From StorageManager to BlockLocator.
+  kGetAllDomainNetworkAddressesResponseMessage,  // From BlockLocator to StorageManager.
   kBlockDomainUnregistrationMessage,  // From StorageManager to BlockLocator.
 #endif
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 3f74af3..6494f62 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -85,6 +85,8 @@ class QueryExecutionUtil {
       case kDeleteBlockLocationMessage:                   return "DeleteBlockLocationMessage";
       case kLocateBlockMessage:                           return "LocateBlockMessage";
       case kLocateBlockResponseMessage:                   return "LocateBlockResponseMessage";
+      case kGetAllDomainNetworkAddressesMessage:          return "GetAllDomainNetworkAddressesMessage";
+      case kGetAllDomainNetworkAddressesResponseMessage:  return "GetAllDomainNetworkAddressesResponseMessage";
       case kGetPeerDomainNetworkAddressesMessage:         return "GetPeerDomainNetworkAddressesMessage";
       case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage";
       case kBlockDomainUnregistrationMessage:             return "BlockDomainUnregistrationMessage";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index c70eafa..a06301b 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -224,6 +224,9 @@ StorageManager::StorageManager(
   if (bus_) {
     storage_manager_client_id_ = bus_->Connect();
 
+    bus_->RegisterClientAsSender(storage_manager_client_id_, kGetAllDomainNetworkAddressesMessage);
+    bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetAllDomainNetworkAddressesResponseMessage);
+
     bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
     bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
 
@@ -619,6 +622,56 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
   return domain_network_addresses;
 }
 
+string StorageManager::getPeerDomainNetworkAddress(const block_id_domain block_domain) {
+  {
+    SpinSharedMutexSharedLock<false> read_lock(block_domain_network_addresses_shared_mutex_);
+    const auto cit = block_domain_network_addresses_.find(block_domain);
+    if (cit != block_domain_network_addresses_.end()) {
+      return cit->second;
+    }
+  }
+
+  {
+    SpinSharedMutexExclusiveLock<false> write_lock(block_domain_network_addresses_shared_mutex_);
+
+    // Check one more time if the block domain network info got set up by someone else.
+    auto cit = block_domain_network_addresses_.find(block_domain);
+    if (cit != block_domain_network_addresses_.end()) {
+      return cit->second;
+    }
+
+    DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+               << " sent GetAllDomainNetworkAddressesMessage to BlockLocator";
+
+    DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+    DCHECK(bus_ != nullptr);
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(bus_, storage_manager_client_id_, block_locator_client_id_,
+                                           TaggedMessage(kGetAllDomainNetworkAddressesMessage)));
+
+    const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    CHECK_EQ(block_locator_client_id_, annotated_message.sender);
+    CHECK_EQ(kGetAllDomainNetworkAddressesResponseMessage, tagged_message.message_type());
+    DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+               << " received GetAllDomainNetworkAddressesResponseMessage from BlockLocator";
+
+    serialization::GetAllDomainNetworkAddressesResponseMessage proto;
+    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+    for (const auto &domain_network_address_pair : proto.domain_network_addresses()) {
+      const block_id_domain block_domain = domain_network_address_pair.first;
+      if (block_domain_network_addresses_.find(block_domain) == block_domain_network_addresses_.end()) {
+        block_domain_network_addresses_.emplace(block_domain, domain_network_address_pair.second);
+      }
+    }
+
+    cit = block_domain_network_addresses_.find(block_domain);
+    DCHECK(cit != block_domain_network_addresses_.end());
+    return cit->second;
+  }
+}
+
 void StorageManager::sendBlockLocationMessage(const block_id block,
                                               const tmb::message_type_id message_type) {
   switch (message_type) {
@@ -663,37 +716,33 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   // already loaded before this function gets called.
   BlockHandle loaded_handle;
 
-#ifdef QUICKSTEP_DISTRIBUTED
   // TODO(quickstep-team): Use a cost model to determine whether to load from
   // a remote peer or the disk.
-  if (BlockIdUtil::Domain(block) != block_domain_) {
-    DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
-    const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
-    for (const string &peer_domain_network_address : peer_domain_network_addresses) {
-      DataExchangerClientAsync client(
-          grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
-          this);
-
-      if (client.Pull(block, numa_node, &loaded_handle)) {
-        sendBlockLocationMessage(block, kAddBlockLocationMessage);
-        return loaded_handle;
-      }
-    }
+  const size_t num_slots = file_manager_->numSlots(block);
+  if (num_slots != 0) {
+    void *block_buffer = allocateSlots(num_slots, numa_node);
 
-    DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
-               << " from remote peers, so try to load from disk.";
-  }
-#endif
+    const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
+    CHECK(status) << "Failed to read block from persistent storage: " << block;
 
-  const size_t num_slots = file_manager_->numSlots(block);
-  DEBUG_ASSERT(num_slots != 0);
-  void *block_buffer = allocateSlots(num_slots, numa_node);
+    loaded_handle.block_memory = block_buffer;
+    loaded_handle.block_memory_size = num_slots;
+  } else {
+    bool pull_succeeded = false;
 
-  const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
-  CHECK(status) << "Failed to read block from persistent storage: " << block;
+#ifdef QUICKSTEP_DISTRIBUTED
+    DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+    DataExchangerClientAsync client(
+        grpc::CreateChannel(getPeerDomainNetworkAddress(BlockIdUtil::Domain(block)),
+                            grpc::InsecureChannelCredentials()),
+        this);
+    if (client.Pull(block, numa_node, &loaded_handle)) {
+      pull_succeeded = true;
+    }
+#endif
 
-  loaded_handle.block_memory = block_buffer;
-  loaded_handle.block_memory_size = num_slots;
+    CHECK(pull_succeeded) << "Failed to pull Block " << BlockIdUtil::ToString(block) << " from remote peers.";
+  }
 
 #ifdef QUICKSTEP_DISTRIBUTED
   if (bus_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index dc4b7e8..c40e1bf 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -456,6 +456,15 @@ class StorageManager {
   std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block);
 
   /**
+   * @brief Get the network info of the given block domain.
+   *
+   * @param block_domain The domain of block or blob to pull.
+   *
+   * @return The network info of the given block domain.
+   **/
+  std::string getPeerDomainNetworkAddress(const block_id_domain block_domain);
+
+  /**
    * @brief Update the block location info in BlockLocator.
    *
    * @param block The given block or blob.
@@ -615,6 +624,10 @@ class StorageManager {
   std::unordered_map<block_id, BlockHandle> blocks_;
   alignas(kCacheLineBytes) mutable SpinSharedMutex<false> blocks_shared_mutex_;
 
+  // Used to pull a remote block.
+  std::unordered_map<block_id_domain, std::string> block_domain_network_addresses_;
+  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_domain_network_addresses_shared_mutex_;
+
   // This lock manager is used with the following contract:
   //   (1) A block cannot be evicted unless an exclusive lock is held on its
   //       lock shard.