You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/14 05:43:03 UTC
incubator-quickstep git commit: Retry pulling if RPC fails in the
distributed version.
Repository: incubator-quickstep
Updated Branches:
refs/heads/dist-pull-retry [created] c2b200f84
Retry pulling if RPC fails in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c2b200f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c2b200f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c2b200f8
Branch: refs/heads/dist-pull-retry
Commit: c2b200f848d9528fe7dc826bfa1764b1a2d5bcd3
Parents: 256f9dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 22:42:54 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 22:42:54 2017 -0700
----------------------------------------------------------------------
storage/StorageManager.cpp | 59 +++++++++++++++++++----------------------
1 file changed, 28 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c2b200f8/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index c70eafa..ad7bd9d 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -528,25 +528,21 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
grpc::CompletionQueue queue;
- unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc(
- stub_->AsyncPull(&context, request, &queue));
-
PullResponse response;
grpc::Status status;
- rpc->Finish(&response, &status, reinterpret_cast<void*>(1));
-
- void *got_tag;
- bool ok = false;
+ do {
+ unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc(
+ stub_->AsyncPull(&context, request, &queue));
+ rpc->Finish(&response, &status, reinterpret_cast<void*>(1));
- queue.Next(&got_tag, &ok);
- CHECK(got_tag == reinterpret_cast<void*>(1));
- CHECK(ok);
+ void *got_tag = nullptr;
+ bool ok = false;
- if (!status.ok()) {
- LOG(ERROR) << "DataExchangerClientAsync Pull error: RPC failed";
- return false;
- }
+ queue.Next(&got_tag, &ok);
+ DCHECK(got_tag == reinterpret_cast<void*>(1));
+ DCHECK(ok);
+ } while (!status.ok());
if (!response.is_valid()) {
LOG(INFO) << "The pulling block not found in all the peers";
@@ -663,10 +659,21 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
// already loaded before this function gets called.
BlockHandle loaded_handle;
-#ifdef QUICKSTEP_DISTRIBUTED
// TODO(quickstep-team): Use a cost model to determine whether to load from
// a remote peer or the disk.
- if (BlockIdUtil::Domain(block) != block_domain_) {
+ const size_t num_slots = file_manager_->numSlots(block);
+ if (num_slots != 0) {
+ void *block_buffer = allocateSlots(num_slots, numa_node);
+
+ const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
+ CHECK(status) << "Failed to read block from persistent storage: " << block;
+
+ loaded_handle.block_memory = block_buffer;
+ loaded_handle.block_memory_size = num_slots;
+ } else {
+ bool pull_succeeded = false;
+
+#ifdef QUICKSTEP_DISTRIBUTED
DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
for (const string &peer_domain_network_address : peer_domain_network_addresses) {
@@ -675,25 +682,15 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
this);
if (client.Pull(block, numa_node, &loaded_handle)) {
- sendBlockLocationMessage(block, kAddBlockLocationMessage);
- return loaded_handle;
+ pull_succeeded = true;
+ break;
}
}
-
- DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
- << " from remote peers, so try to load from disk.";
- }
#endif
- const size_t num_slots = file_manager_->numSlots(block);
- DEBUG_ASSERT(num_slots != 0);
- void *block_buffer = allocateSlots(num_slots, numa_node);
-
- const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
- CHECK(status) << "Failed to read block from persistent storage: " << block;
-
- loaded_handle.block_memory = block_buffer;
- loaded_handle.block_memory_size = num_slots;
+ CHECK(pull_succeeded)
+ << "Failed to pull Block " << BlockIdUtil::ToString(block) << " from remote peers.";
+ }
#ifdef QUICKSTEP_DISTRIBUTED
if (bus_) {