You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/14 05:43:03 UTC

incubator-quickstep git commit: Retry pulling if RPC fails in the distributed version.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/dist-pull-retry [created] c2b200f84


Retry pulling if RPC fails in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c2b200f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c2b200f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c2b200f8

Branch: refs/heads/dist-pull-retry
Commit: c2b200f848d9528fe7dc826bfa1764b1a2d5bcd3
Parents: 256f9dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 22:42:54 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 22:42:54 2017 -0700

----------------------------------------------------------------------
 storage/StorageManager.cpp | 59 +++++++++++++++++++----------------------
 1 file changed, 28 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c2b200f8/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index c70eafa..ad7bd9d 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -528,25 +528,21 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
 
   grpc::CompletionQueue queue;
 
-  unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc(
-      stub_->AsyncPull(&context, request, &queue));
-
   PullResponse response;
   grpc::Status status;
 
-  rpc->Finish(&response, &status, reinterpret_cast<void*>(1));
-
-  void *got_tag;
-  bool ok = false;
+  do {
+    unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc(
+        stub_->AsyncPull(&context, request, &queue));
+    rpc->Finish(&response, &status, reinterpret_cast<void*>(1));
 
-  queue.Next(&got_tag, &ok);
-  CHECK(got_tag == reinterpret_cast<void*>(1));
-  CHECK(ok);
+    void *got_tag = nullptr;
+    bool ok = false;
 
-  if (!status.ok()) {
-    LOG(ERROR) << "DataExchangerClientAsync Pull error: RPC failed";
-    return false;
-  }
+    queue.Next(&got_tag, &ok);
+    DCHECK(got_tag == reinterpret_cast<void*>(1));
+    DCHECK(ok);
+  } while (!status.ok());
 
   if (!response.is_valid()) {
     LOG(INFO) << "The pulling block not found in all the peers";
@@ -663,10 +659,21 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   // already loaded before this function gets called.
   BlockHandle loaded_handle;
 
-#ifdef QUICKSTEP_DISTRIBUTED
   // TODO(quickstep-team): Use a cost model to determine whether to load from
   // a remote peer or the disk.
-  if (BlockIdUtil::Domain(block) != block_domain_) {
+  const size_t num_slots = file_manager_->numSlots(block);
+  if (num_slots != 0) {
+    void *block_buffer = allocateSlots(num_slots, numa_node);
+
+    const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
+    CHECK(status) << "Failed to read block from persistent storage: " << block;
+
+    loaded_handle.block_memory = block_buffer;
+    loaded_handle.block_memory_size = num_slots;
+  } else {
+    bool pull_succeeded = false;
+
+#ifdef QUICKSTEP_DISTRIBUTED
     DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
     const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
     for (const string &peer_domain_network_address : peer_domain_network_addresses) {
@@ -675,25 +682,15 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
           this);
 
       if (client.Pull(block, numa_node, &loaded_handle)) {
-        sendBlockLocationMessage(block, kAddBlockLocationMessage);
-        return loaded_handle;
+        pull_succeeded = true;
+        break;
       }
     }
-
-    DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
-               << " from remote peers, so try to load from disk.";
-  }
 #endif
 
-  const size_t num_slots = file_manager_->numSlots(block);
-  DEBUG_ASSERT(num_slots != 0);
-  void *block_buffer = allocateSlots(num_slots, numa_node);
-
-  const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
-  CHECK(status) << "Failed to read block from persistent storage: " << block;
-
-  loaded_handle.block_memory = block_buffer;
-  loaded_handle.block_memory_size = num_slots;
+    CHECK(pull_succeeded)
+        << "Failed to pull Block " << BlockIdUtil::ToString(block) << " from remote peers.";
+  }
 
 #ifdef QUICKSTEP_DISTRIBUTED
   if (bus_) {