You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/03/29 00:08:19 UTC
[15/40] incubator-quickstep git commit: Removed unnecessary messages
in BlockLocator.
Removed unnecessary messages in BlockLocator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/22bac39c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/22bac39c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/22bac39c
Branch: refs/heads/new-op
Commit: 22bac39c0c25b48ce34a3a8dc4a79c51716a4e75
Parents: 256f9dd
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 16 15:43:42 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 16 15:43:42 2017 -0700
----------------------------------------------------------------------
query_execution/BlockLocator.cpp | 35 --------------
query_execution/BlockLocator.hpp | 33 ++++++++------
query_execution/QueryExecutionMessages.proto | 4 --
query_execution/QueryExecutionTypedefs.hpp | 2 -
query_execution/QueryExecutionUtil.hpp | 2 -
query_execution/tests/BlockLocator_unittest.cpp | 48 ++------------------
6 files changed, 24 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 765021e..89c1c00 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -130,13 +130,6 @@ void BlockLocator::run() {
}
break;
}
- case kLocateBlockMessage: {
- serialization::BlockMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processLocateBlockMessage(sender, proto.block_id());
- break;
- }
case kGetPeerDomainNetworkAddressesMessage: {
serialization::BlockMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -200,34 +193,6 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive
move(message)));
}
-void BlockLocator::processLocateBlockMessage(const client_id receiver,
- const block_id block) {
- serialization::LocateBlockResponseMessage proto;
-
- // NOTE(zuyu): We don't need to protect here, as all the writers are in the
- // single thread.
- for (const block_id_domain domain : block_locations_[block]) {
- proto.add_block_domains(domain);
- }
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kLocateBlockResponseMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "BlockLocator with Client " << locator_client_id_
- << " sent LocateBlockResponseMessage to StorageManager with Client " << receiver;
- CHECK(tmb::MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(bus_,
- locator_client_id_,
- receiver,
- move(message)));
-}
-
void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
const block_id block) {
serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index 4690369..f5572ca 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -74,9 +74,6 @@ class BlockLocator : public Thread {
bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
- bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage);
- bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage);
-
bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
@@ -96,29 +93,38 @@ class BlockLocator : public Thread {
}
/**
- * @brief Get the block locality info for scheduling in ForemanDistributed.
+ * @brief Get the block domain info for a given block.
*
* @param block The given block.
- * @param shiftboss_index_for_block The index of Shiftboss that has loaded the
- * block in the buffer pool.
*
- * @return Whether the block locality info has found.
+ * @return The block domain info for a given block.
**/
- bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const {
- std::unordered_set<block_id_domain> block_domains;
+ std::unordered_set<block_id_domain> getBlockDomains(const block_id block) const {
{
// Lock 'block_locations_shared_mutex_' as briefly as possible as a
// reader.
SpinSharedMutexSharedLock<false> read_lock(block_locations_shared_mutex_);
const auto cit = block_locations_.find(block);
if (cit != block_locations_.end()) {
- block_domains = cit->second;
- } else {
- return false;
+ return cit->second;
}
}
- {
+ return std::unordered_set<block_id_domain>();
+ }
+
+ /**
+ * @brief Get the block locality info for scheduling in ForemanDistributed.
+ *
+ * @param block The given block.
+ * @param shiftboss_index_for_block The index of Shiftboss that has loaded the
+ * block in the buffer pool.
+ *
+ * @return Whether the block locality info has found.
+ **/
+ bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const {
+ const std::unordered_set<block_id_domain> block_domains = getBlockDomains(block);
+ if (!block_domains.empty()) {
// NOTE(zuyu): This lock is held for the rest duration of this call, as the
// exclusive case is rare.
SpinSharedMutexSharedLock<false> read_lock(block_domain_to_shiftboss_index_shared_mutex_);
@@ -140,7 +146,6 @@ class BlockLocator : public Thread {
private:
void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
- void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
tmb::MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e8f102a..c70b339 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -164,10 +164,6 @@ message BlockMessage {
required fixed64 block_id = 1;
}
-message LocateBlockResponseMessage {
- repeated uint32 block_domains = 1;
-}
-
message GetPeerDomainNetworkAddressesResponseMessage {
repeated string domain_network_addresses = 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index afdac92..c56bcfd 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -115,8 +115,6 @@ enum QueryExecutionMessageType : message_type_id {
kBlockDomainToShiftbossIndexMessage, // From StorageManager to BlockLocator.
kAddBlockLocationMessage, // From StorageManager to BlockLocator.
kDeleteBlockLocationMessage, // From StorageManager to BlockLocator.
- kLocateBlockMessage, // From StorageManager to BlockLocator.
- kLocateBlockResponseMessage, // From BlockLocator to StorageManager.
kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator.
kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager.
kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 3f74af3..1388426 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -83,8 +83,6 @@ class QueryExecutionUtil {
case kBlockDomainToShiftbossIndexMessage: return "BlockDomainToShiftbossIndexMessage";
case kAddBlockLocationMessage: return "AddBlockLocationMessage";
case kDeleteBlockLocationMessage: return "DeleteBlockLocationMessage";
- case kLocateBlockMessage: return "LocateBlockMessage";
- case kLocateBlockResponseMessage: return "LocateBlockResponseMessage";
case kGetPeerDomainNetworkAddressesMessage: return "GetPeerDomainNetworkAddressesMessage";
case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage";
case kBlockDomainUnregistrationMessage: return "BlockDomainUnregistrationMessage";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index b73c2f7..1a7cf17 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -21,6 +21,7 @@
#include <memory>
#include <string>
#include <utility>
+#include <unordered_set>
#include <vector>
#include "catalog/CatalogAttribute.hpp"
@@ -51,6 +52,7 @@ using std::malloc;
using std::move;
using std::string;
using std::unique_ptr;
+using std::unordered_set;
using std::vector;
using tmb::AnnotatedMessage;
@@ -79,9 +81,6 @@ class BlockLocatorTest : public ::testing::Test {
bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
- bus_.RegisterClientAsSender(worker_client_id_, kLocateBlockMessage);
- bus_.RegisterClientAsReceiver(worker_client_id_, kLocateBlockResponseMessage);
-
bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
block_domain_ =
@@ -106,58 +105,21 @@ class BlockLocatorTest : public ::testing::Test {
move(message)));
}
- vector<block_id_domain> getPeerDomains(const block_id block) {
- serialization::BlockMessage proto;
- proto.set_block_id(block);
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kLocateBlockMessage);
- free(proto_bytes);
-
- LOG(INFO) << "Worker wth Client " << worker_client_id_ << " sent LocateBlockMessage to BlockLocator";
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_,
- worker_client_id_,
- locator_client_id_,
- move(message)));
-
- const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
- CHECK_EQ(kLocateBlockResponseMessage, tagged_message.message_type());
- LOG(INFO) << "Worker with Client " << worker_client_id_
- << " received LocateBlockResponseMessage from BlockLocator";
-
- serialization::LocateBlockResponseMessage response_proto;
- CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- vector<block_id_domain> domains;
- for (int i = 0; i < response_proto.block_domains_size(); ++i) {
- domains.push_back(response_proto.block_domains(i));
- }
-
- return domains;
- }
-
void checkLoaded(const block_id block) {
const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
EXPECT_EQ(1u, peer_domain_network_addresses.size());
EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data());
- const vector<block_id_domain> domains = getPeerDomains(block);
+ const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block);
EXPECT_EQ(1u, domains.size());
- EXPECT_EQ(block_domain_, domains[0]);
+ EXPECT_EQ(1u, domains.count(block_domain_));
}
void checkEvicted(const block_id block) {
const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
EXPECT_TRUE(peer_domain_network_addresses.empty());
- const vector<block_id_domain> domains = getPeerDomains(block);
+ const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block);
EXPECT_TRUE(domains.empty());
}