You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/15 09:39:17 UTC
incubator-quickstep git commit: Refactored the data exchange process.
Repository: incubator-quickstep
Updated Branches:
refs/heads/refactor-data-exchange [created] 9fae916de
Refactored the data exchange process.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9fae916d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9fae916d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9fae916d
Branch: refs/heads/refactor-data-exchange
Commit: 9fae916de2a5efe97d1050d99340f59c5028df63
Parents: 256f9dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Mar 15 02:39:02 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 15 02:39:02 2017 -0700
----------------------------------------------------------------------
query_execution/BlockLocator.cpp | 28 +++++++
query_execution/BlockLocator.hpp | 4 +
query_execution/QueryExecutionMessages.proto | 4 +
query_execution/QueryExecutionTypedefs.hpp | 2 +
query_execution/QueryExecutionUtil.hpp | 2 +
storage/StorageManager.cpp | 99 +++++++++++++++++------
storage/StorageManager.hpp | 13 +++
7 files changed, 127 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 765021e..94a0263 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -137,6 +137,10 @@ void BlockLocator::run() {
processLocateBlockMessage(sender, proto.block_id());
break;
}
+ case kGetAllDomainNetworkAddressesMessage: {
+ processGetAllDomainNetworkAddressesMessage(sender);
+ break;
+ }
case kGetPeerDomainNetworkAddressesMessage: {
serialization::BlockMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -228,6 +232,30 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver,
move(message)));
}
+void BlockLocator::processGetAllDomainNetworkAddressesMessage(const client_id receiver) {
+ serialization::GetAllDomainNetworkAddressesResponseMessage proto;
+
+ // NOTE(zuyu): We don't need to protect here, as all the writers are in the
+ // single thread.
+ for (const auto &domain_network_address_pair : domain_network_addresses_) {
+ (*proto.mutable_domain_network_addresses())[domain_network_address_pair.first] =
+ domain_network_address_pair.second;
+ }
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length,
+ kGetAllDomainNetworkAddressesResponseMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "BlockLocator with Client " << locator_client_id_
+ << " sent GetAllDomainNetworkAddressesResponseMessage to StorageManager with Client " << receiver;
+ CHECK(tmb::MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_, locator_client_id_, receiver, move(message)));
+}
+
void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
const block_id block) {
serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index 4690369..fd4b9ee 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -80,6 +80,9 @@ class BlockLocator : public Thread {
bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+ bus_->RegisterClientAsReceiver(locator_client_id_, kGetAllDomainNetworkAddressesMessage);
+ bus_->RegisterClientAsSender(locator_client_id_, kGetAllDomainNetworkAddressesResponseMessage);
+
bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage);
bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage);
}
@@ -141,6 +144,7 @@ class BlockLocator : public Thread {
private:
void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
+ void processGetAllDomainNetworkAddressesMessage(const tmb::client_id receiver);
void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
tmb::MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e8f102a..60c4be8 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -171,3 +171,7 @@ message LocateBlockResponseMessage {
message GetPeerDomainNetworkAddressesResponseMessage {
repeated string domain_network_addresses = 1;
}
+
+message GetAllDomainNetworkAddressesResponseMessage {
+ map<uint32, string> domain_network_addresses = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index afdac92..7c2cd1b 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -119,6 +119,8 @@ enum QueryExecutionMessageType : message_type_id {
kLocateBlockResponseMessage, // From BlockLocator to StorageManager.
kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator.
kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager.
+ kGetAllDomainNetworkAddressesMessage, // From StorageManager to BlockLocator.
+ kGetAllDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager.
kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator.
#endif
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 3f74af3..6494f62 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -85,6 +85,8 @@ class QueryExecutionUtil {
case kDeleteBlockLocationMessage: return "DeleteBlockLocationMessage";
case kLocateBlockMessage: return "LocateBlockMessage";
case kLocateBlockResponseMessage: return "LocateBlockResponseMessage";
+ case kGetAllDomainNetworkAddressesMessage: return "GetAllDomainNetworkAddressesMessage";
+ case kGetAllDomainNetworkAddressesResponseMessage: return "GetAllDomainNetworkAddressesResponseMessage";
case kGetPeerDomainNetworkAddressesMessage: return "GetPeerDomainNetworkAddressesMessage";
case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage";
case kBlockDomainUnregistrationMessage: return "BlockDomainUnregistrationMessage";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index c70eafa..a06301b 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -224,6 +224,9 @@ StorageManager::StorageManager(
if (bus_) {
storage_manager_client_id_ = bus_->Connect();
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kGetAllDomainNetworkAddressesMessage);
+ bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetAllDomainNetworkAddressesResponseMessage);
+
bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
@@ -619,6 +622,56 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc
return domain_network_addresses;
}
+string StorageManager::getPeerDomainNetworkAddress(const block_id_domain block_domain) {
+ {
+ SpinSharedMutexSharedLock<false> read_lock(block_domain_network_addresses_shared_mutex_);
+ const auto cit = block_domain_network_addresses_.find(block_domain);
+ if (cit != block_domain_network_addresses_.end()) {
+ return cit->second;
+ }
+ }
+
+ {
+ SpinSharedMutexExclusiveLock<false> write_lock(block_domain_network_addresses_shared_mutex_);
+
+ // Check one more time if the block domain network info got set up by someone else.
+ auto cit = block_domain_network_addresses_.find(block_domain);
+ if (cit != block_domain_network_addresses_.end()) {
+ return cit->second;
+ }
+
+ DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+ << " sent GetAllDomainNetworkAddressesMessage to BlockLocator";
+
+ DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+ DCHECK(bus_ != nullptr);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_, storage_manager_client_id_, block_locator_client_id_,
+ TaggedMessage(kGetAllDomainNetworkAddressesMessage)));
+
+ const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ CHECK_EQ(block_locator_client_id_, annotated_message.sender);
+ CHECK_EQ(kGetAllDomainNetworkAddressesResponseMessage, tagged_message.message_type());
+ DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+ << " received GetAllDomainNetworkAddressesResponseMessage from BlockLocator";
+
+ serialization::GetAllDomainNetworkAddressesResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ for (const auto &domain_network_address_pair : proto.domain_network_addresses()) {
+ const block_id_domain block_domain = domain_network_address_pair.first;
+ if (block_domain_network_addresses_.find(block_domain) == block_domain_network_addresses_.end()) {
+ block_domain_network_addresses_.emplace(block_domain, domain_network_address_pair.second);
+ }
+ }
+
+ cit = block_domain_network_addresses_.find(block_domain);
+ DCHECK(cit != block_domain_network_addresses_.end());
+ return cit->second;
+ }
+}
+
void StorageManager::sendBlockLocationMessage(const block_id block,
const tmb::message_type_id message_type) {
switch (message_type) {
@@ -663,37 +716,33 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
// already loaded before this function gets called.
BlockHandle loaded_handle;
-#ifdef QUICKSTEP_DISTRIBUTED
// TODO(quickstep-team): Use a cost model to determine whether to load from
// a remote peer or the disk.
- if (BlockIdUtil::Domain(block) != block_domain_) {
- DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
- const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
- for (const string &peer_domain_network_address : peer_domain_network_addresses) {
- DataExchangerClientAsync client(
- grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
- this);
-
- if (client.Pull(block, numa_node, &loaded_handle)) {
- sendBlockLocationMessage(block, kAddBlockLocationMessage);
- return loaded_handle;
- }
- }
+ const size_t num_slots = file_manager_->numSlots(block);
+ if (num_slots != 0) {
+ void *block_buffer = allocateSlots(num_slots, numa_node);
- DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
- << " from remote peers, so try to load from disk.";
- }
-#endif
+ const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
+ CHECK(status) << "Failed to read block from persistent storage: " << block;
- const size_t num_slots = file_manager_->numSlots(block);
- DEBUG_ASSERT(num_slots != 0);
- void *block_buffer = allocateSlots(num_slots, numa_node);
+ loaded_handle.block_memory = block_buffer;
+ loaded_handle.block_memory_size = num_slots;
+ } else {
+ bool pull_succeeded = false;
- const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
- CHECK(status) << "Failed to read block from persistent storage: " << block;
+#ifdef QUICKSTEP_DISTRIBUTED
+ DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+ DataExchangerClientAsync client(
+ grpc::CreateChannel(getPeerDomainNetworkAddress(BlockIdUtil::Domain(block)),
+ grpc::InsecureChannelCredentials()),
+ this);
+ if (client.Pull(block, numa_node, &loaded_handle)) {
+ pull_succeeded = true;
+ }
+#endif
- loaded_handle.block_memory = block_buffer;
- loaded_handle.block_memory_size = num_slots;
+ CHECK(pull_succeeded) << "Failed to pull Block " << BlockIdUtil::ToString(block) << " from remote peers.";
+ }
#ifdef QUICKSTEP_DISTRIBUTED
if (bus_) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index dc4b7e8..c40e1bf 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -456,6 +456,15 @@ class StorageManager {
std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block);
/**
+ * @brief Get the network info of the given block domain.
+ *
+ * @param block_domain The domain of block or blob to pull.
+ *
+ * @return The network info of the given block domain.
+ **/
+ std::string getPeerDomainNetworkAddress(const block_id_domain block_domain);
+
+ /**
* @brief Update the block location info in BlockLocator.
*
* @param block The given block or blob.
@@ -615,6 +624,10 @@ class StorageManager {
std::unordered_map<block_id, BlockHandle> blocks_;
alignas(kCacheLineBytes) mutable SpinSharedMutex<false> blocks_shared_mutex_;
+ // Used to pull a remote block.
+ std::unordered_map<block_id_domain, std::string> block_domain_network_addresses_;
+ alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_domain_network_addresses_shared_mutex_;
+
// This lock manager is used with the following contract:
// (1) A block cannot be evicted unless an exclusive lock is held on its
// lock shard.