You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/09 01:10:07 UTC
[38/48] incubator-quickstep git commit: Added BlockLocator.
Added BlockLocator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3789da72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3789da72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3789da72
Branch: refs/heads/query-manager-used-in-foreman
Commit: 3789da728e95b91bd97587f5e34da6ff1b55ea5f
Parents: 4503198
Author: Zuyu Zhang <zz...@pivotal.io>
Authored: Sat May 28 14:24:13 2016 -0700
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Wed Jun 8 11:57:46 2016 -0700
----------------------------------------------------------------------
query_execution/BlockLocator.cpp | 223 +++++++++++++++
query_execution/BlockLocator.hpp | 125 +++++++++
query_execution/CMakeLists.txt | 48 ++++
query_execution/QueryExecutionMessages.proto | 34 +++
query_execution/QueryExecutionTypedefs.hpp | 16 ++
query_execution/tests/BlockLocator_unittest.cpp | 270 +++++++++++++++++++
storage/CMakeLists.txt | 9 +-
storage/StorageManager.cpp | 190 ++++++++++++-
storage/StorageManager.hpp | 76 +++++-
9 files changed, 984 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
new file mode 100644
index 0000000..6cf5249
--- /dev/null
+++ b/query_execution/BlockLocator.cpp
@@ -0,0 +1,223 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/BlockLocator.hpp"
+
+#include <cstdlib>
+#include <string>
+#include <utility>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "threading/ThreadUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+void BlockLocator::run() {
+ if (cpu_id_ >= 0) {
+ ThreadUtil::BindToCPU(cpu_id_);
+ }
+
+ for (;;) {
+ // Receive() is a blocking call, causing this thread to sleep until next
+ // message is received.
+ const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const client_id sender = annotated_message.sender;
+ LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type()
+ << "' message from TMB Client " << sender;
+ switch (tagged_message.message_type()) {
+ case kBlockDomainRegistrationMessage: {
+ serialization::BlockDomainRegistrationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
+ break;
+ }
+ case kAddBlockLocationMessage: {
+ serialization::BlockLocationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const block_id block = proto.block_id();
+ const block_id_domain domain = proto.block_domain();
+
+ const auto result_block_locations = block_locations_[block].insert(domain);
+ const auto result_domain_blocks = domain_blocks_[domain].insert(block);
+ DCHECK_EQ(result_block_locations.second, result_domain_blocks.second);
+
+ if (result_domain_blocks.second) {
+ LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain;
+ } else {
+ LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain;
+ }
+ break;
+ }
+ case kDeleteBlockLocationMessage: {
+ serialization::BlockLocationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const block_id block = proto.block_id();
+ const block_id_domain domain = proto.block_domain();
+
+ const auto cit = block_locations_[block].find(domain);
+ if (cit != block_locations_[block].end()) {
+ block_locations_[block].erase(domain);
+ domain_blocks_[domain].erase(block);
+
+ LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain;
+ } else {
+ LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain;
+ }
+ break;
+ }
+ case kLocateBlockMessage: {
+ serialization::BlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processLocateBlockMessage(sender, proto.block_id());
+ break;
+ }
+ case kGetPeerDomainNetworkAddressesMessage: {
+ serialization::BlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processGetPeerDomainNetworkAddressesMessage(sender, proto.block_id());
+ break;
+ }
+ case kBlockDomainUnregistrationMessage: {
+ serialization::BlockDomainMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const block_id_domain domain = proto.block_domain();
+
+ domain_network_addresses_.erase(domain);
+
+ for (const block_id block : domain_blocks_[domain]) {
+ block_locations_[block].erase(domain);
+ }
+ domain_blocks_.erase(domain);
+
+ LOG(INFO) << "Unregistered Domain " << domain;
+ break;
+ }
+ case kPoisonMessage: {
+ return;
+ }
+ }
+ }
+}
+
+void BlockLocator::processBlockDomainRegistrationMessage(const client_id receiver,
+ const std::string &network_address) {
+ DCHECK_LT(block_domain_, kMaxDomain);
+
+ domain_network_addresses_.emplace(++block_domain_, network_address);
+ domain_blocks_[block_domain_];
+
+ serialization::BlockDomainMessage proto;
+ proto.set_block_domain(block_domain_);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainRegistrationResponseMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent BlockDomainRegistrationResponseMessage (typed '"
+ << kBlockDomainRegistrationResponseMessage
+ << "') to Worker (id '" << receiver << "')";
+ CHECK(tmb::MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ locator_client_id_,
+ receiver,
+ move(message)));
+}
+
+void BlockLocator::processLocateBlockMessage(const client_id receiver,
+ const block_id block) {
+ serialization::LocateBlockResponseMessage proto;
+
+ for (const block_id_domain domain : block_locations_[block]) {
+ proto.add_block_domains(domain);
+ }
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kLocateBlockResponseMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage
+ << "') to StorageManager (id '" << receiver << "')";
+ CHECK(tmb::MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ locator_client_id_,
+ receiver,
+ move(message)));
+}
+
+void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
+ const block_id block) {
+ serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
+
+ for (const block_id_domain domain : block_locations_[block]) {
+ proto.add_domain_network_addresses(domain_network_addresses_[domain]);
+ }
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kGetPeerDomainNetworkAddressesResponseMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "BlockLocator (id '" << locator_client_id_
+ << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '"
+ << kGetPeerDomainNetworkAddressesResponseMessage
+ << "') to StorageManager (id '" << receiver << "')";
+ CHECK(tmb::MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ locator_client_id_,
+ receiver,
+ move(message)));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
new file mode 100644
index 0000000..bbd9b8f
--- /dev/null
+++ b/query_execution/BlockLocator.hpp
@@ -0,0 +1,125 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
+
+#include <atomic>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "threading/Thread.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class for keeping trace of blocks loaded in a Worker's buffer pool
+ * in the distributed version.
+ **/
+class BlockLocator : public Thread {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param bus A pointer to the TMB.
+ * @param cpu_id The ID of the CPU to which the BlockLocator thread can be pinned.
+ *
+ * @note If cpu_id is not specified, BlockLocator thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ BlockLocator(tmb::MessageBus *bus,
+ const int cpu_id = -1)
+ : bus_(DCHECK_NOTNULL(bus)),
+ cpu_id_(cpu_id),
+ block_domain_(0) {
+ locator_client_id_ = bus_->Connect();
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage);
+ bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage);
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
+ bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage);
+ bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage);
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
+ bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+
+ bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage);
+ bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage);
+ }
+
+ ~BlockLocator() override {}
+
+ /**
+ * @brief Get the TMB client ID of BlockLocator thread.
+ *
+ * @return TMB client ID of BlockLocator thread.
+ **/
+ tmb::client_id getBusClientID() const {
+ return locator_client_id_;
+ }
+
+ protected:
+ void run() override;
+
+ private:
+ void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
+ void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
+ void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
+
+ tmb::MessageBus *bus_;
+
+ // The ID of the CPU that the BlockLocator thread can optionally be pinned to.
+ const int cpu_id_;
+
+ alignas(kCacheLineBytes) std::atomic<block_id_domain> block_domain_;
+
+ // From a block domain to its network info in the ip:port format, i.e.,
+ // "0.0.0.0:0".
+ std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
+
+ // From a block to its domains.
+ std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+
+ // From a block domain to all blocks loaded in its buffer pool.
+ std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;
+
+ tmb::client_id locator_client_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(BlockLocator);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 04a0348..7d9d601 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -20,6 +20,9 @@ QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryExecutionMessages_proto_srcs
QueryExecutionMessages.proto)
# Declare micro-libs:
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
+endif()
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -40,6 +43,19 @@ add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessag
add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
# Link dependencies:
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_BlockLocator
+ glog
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_threading_Thread
+ quickstep_threading_ThreadUtil
+ quickstep_utility_Macros
+ tmb)
+endif()
target_link_libraries(quickstep_queryexecution_Foreman
glog
gtest
@@ -176,7 +192,37 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
quickstep_queryexecution_WorkerSelectionPolicy)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution
+ quickstep_queryexecution_BlockLocator)
+endif()
+
# Tests:
+if (ENABLE_DISTRIBUTED)
+ add_executable(BlockLocator_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/BlockLocator_unittest.cpp")
+ target_link_libraries(BlockLocator_unittest
+ gflags_nothreads-static
+ glog
+ gtest
+ quickstep_catalog_CatalogAttribute
+ quickstep_catalog_CatalogRelation
+ quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ tmb
+ ${LIBS})
+ add_test(BlockLocator_unittest BlockLocator_unittest)
+endif()
+
add_executable(Foreman_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp")
target_link_libraries(Foreman_unittest
@@ -269,3 +315,5 @@ target_link_libraries(WorkerSelectionPolicy_unittest
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerSelectionPolicy)
add_test(WorkerSelectionPolicy_unittest WorkerSelectionPolicy_unittest)
+
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/block_locator_test_data/)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 8d2efd0..15803cf 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -16,6 +16,10 @@ syntax = "proto2";
package quickstep.serialization;
+// Used for any messages that do not carry payloads.
+message EmptyMessage {
+}
+
// Used for both Normal WorkOrders and RebuildWorkOrders.
// NOTE(zuyu): we might need to seperate the completion messages to contain
// run-time information for Foreman to make better decisions on scheduling
@@ -42,3 +46,33 @@ message DataPipelineMessage {
message WorkOrdersAvailableMessage {
required uint64 operator_index = 1;
}
+
+// BlockLocator related messages.
+message BlockDomainRegistrationMessage {
+ // Format IP:Port, i.e., "0.0.0.0:0".
+ required string domain_network_address = 1;
+}
+
+// Used for RegistrationResponse, Unregistration, and FailureReport.
+message BlockDomainMessage {
+ required uint32 block_domain = 1;
+}
+
+// Used when StorageManager loads or evicts a block or a blob from its buffer
+// pool.
+message BlockLocationMessage {
+ required fixed64 block_id = 1;
+ required uint32 block_domain = 2;
+}
+
+message BlockMessage {
+ required fixed64 block_id = 1;
+}
+
+message LocateBlockResponseMessage {
+ repeated uint32 block_domains = 1;
+}
+
+message GetPeerDomainNetworkAddressesResponseMessage {
+ repeated string domain_network_addresses = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 36cfd82..fc253bc 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED
#include "threading/ThreadIDBasedMap.hpp"
#include "tmb/address.h"
@@ -55,6 +56,7 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
'a',
'p'>;
+// We sort the following message types in the order of a life cycle of a query.
enum QueryExecutionMessageType : message_type_id {
kWorkOrderMessage, // From Foreman to Worker.
kWorkOrderCompleteMessage, // From Worker to Foreman.
@@ -66,6 +68,20 @@ enum QueryExecutionMessageType : message_type_id {
kRebuildWorkOrderMessage, // From Foreman to Worker.
kRebuildWorkOrderCompleteMessage, // From Worker to Foreman.
kPoisonMessage, // From the CLI shell to Foreman, then from Foreman to Workers.
+
+#ifdef QUICKSTEP_DISTRIBUTED
+ // BlockLocator related messages, sorted in a life cycle of StorageManager
+ // with a unique block domain.
+ kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
+ kBlockDomainRegistrationResponseMessage, // From BlockLocator to Worker.
+ kAddBlockLocationMessage, // From StorageManager to BlockLocator.
+ kDeleteBlockLocationMessage, // From StorageManager to BlockLocator.
+ kLocateBlockMessage, // From StorageManager to BlockLocator.
+ kLocateBlockResponseMessage, // From BlockLocator to StorageManager.
+ kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator.
+ kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager.
+ kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator.
+#endif
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
new file mode 100644
index 0000000..fe7b86b
--- /dev/null
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -0,0 +1,270 @@
+/**
+ * Copyright 2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class BlockLocatorTest : public ::testing::Test {
+ protected:
+ static const char kStoragePath[];
+ static const char kDomainNetworkAddress[];
+
+ ~BlockLocatorTest() {
+ locator_->join();
+ }
+
+ virtual void SetUp() {
+ bus_.Initialize();
+
+ locator_.reset(new BlockLocator(&bus_));
+ locator_client_id_ = locator_->getBusClientID();
+ locator_->start();
+
+ worker_client_id_ = bus_.Connect();
+
+ bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage);
+ bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage);
+
+ bus_.RegisterClientAsSender(worker_client_id_, kLocateBlockMessage);
+ bus_.RegisterClientAsReceiver(worker_client_id_, kLocateBlockResponseMessage);
+
+ bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
+
+ block_domain_ = getBlockDomain(kDomainNetworkAddress);
+
+ storage_manager_.reset(
+ new StorageManager(kStoragePath, block_domain_, locator_client_id_, &bus_));
+ }
+
+ virtual void TearDown() {
+ storage_manager_.reset();
+
+ serialization::EmptyMessage proto;
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kPoisonMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') sent PoisonMessage (typed '" << kPoisonMessage
+ << "') to BlockLocator (id '" << locator_client_id_ << "')";
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ worker_client_id_,
+ locator_client_id_,
+ move(message)));
+ }
+
+ vector<block_id_domain> getPeerDomains(const block_id block) {
+ serialization::BlockMessage proto;
+ proto.set_block_id(block);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kLocateBlockMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') sent LocateBlockMessage (typed '" << kLocateBlockMessage
+ << "') to BlockLocator";
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ worker_client_id_,
+ locator_client_id_,
+ move(message)));
+
+ const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ CHECK_EQ(kLocateBlockResponseMessage, tagged_message.message_type());
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') received LocateBlockResponseMessage from BlockLocator";
+
+ serialization::LocateBlockResponseMessage response_proto;
+ CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ vector<block_id_domain> domains;
+ for (int i = 0; i < response_proto.block_domains_size(); ++i) {
+ domains.push_back(response_proto.block_domains(i));
+ }
+
+ return domains;
+ }
+
+ void checkLoaded(const block_id block) {
+ const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
+ EXPECT_EQ(1u, peer_domain_network_addresses.size());
+ EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data());
+
+ const vector<block_id_domain> domains = getPeerDomains(block);
+ EXPECT_EQ(1u, domains.size());
+ EXPECT_EQ(block_domain_, domains[0]);
+ }
+
+ void checkEvicted(const block_id block) {
+ const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
+ EXPECT_TRUE(peer_domain_network_addresses.empty());
+
+ const vector<block_id_domain> domains = getPeerDomains(block);
+ EXPECT_TRUE(domains.empty());
+ }
+
+ tmb::client_id worker_client_id_;
+
+ block_id_domain block_domain_;
+ unique_ptr<StorageManager> storage_manager_;
+
+ private:
+ block_id_domain getBlockDomain(const string &network_address) {
+ serialization::BlockDomainRegistrationMessage proto;
+ proto.set_domain_network_address(network_address);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainRegistrationMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+ << "') to BlockLocator (id '" << locator_client_id_ << "')";
+
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ worker_client_id_,
+ locator_client_id_,
+ move(message)));
+
+ const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ EXPECT_EQ(locator_client_id_, annotated_message.sender);
+ EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+ LOG(INFO) << "Worker (id '" << worker_client_id_
+ << "') received BlockDomainRegistrationResponseMessage from BlockLocator";
+
+ serialization::BlockDomainMessage response_proto;
+ CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ return static_cast<block_id_domain>(response_proto.block_domain());
+ }
+
+ MessageBusImpl bus_;
+
+ unique_ptr<BlockLocator> locator_;
+ tmb::client_id locator_client_id_;
+};
+
+const char BlockLocatorTest::kStoragePath[] = "./block_locator_test_data/";
+const char BlockLocatorTest::kDomainNetworkAddress[] = "ip:port";
+
+TEST_F(BlockLocatorTest, BlockTest) {
+ CatalogRelation relation(nullptr, "rel");
+ relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt)));
+
+ const block_id block =
+ storage_manager_->createBlock(relation, relation.getDefaultStorageBlockLayout());
+ checkLoaded(block);
+
+ ASSERT_TRUE(storage_manager_->saveBlockOrBlob(block));
+ storage_manager_->evictBlockOrBlob(block);
+ checkEvicted(block);
+
+ {
+ const BlockReference block_ref = storage_manager_->getBlock(block, relation);
+ }
+ checkLoaded(block);
+
+ storage_manager_->deleteBlockOrBlobFile(block);
+ checkEvicted(block);
+}
+
+TEST_F(BlockLocatorTest, BlobTest) {
+ const block_id blob = storage_manager_->createBlob(kDefaultBlockSizeInSlots);
+ checkLoaded(blob);
+
+ ASSERT_TRUE(storage_manager_->saveBlockOrBlob(blob));
+ storage_manager_->evictBlockOrBlob(blob);
+ checkEvicted(blob);
+
+ {
+ const BlobReference blob_ref = storage_manager_->getBlob(blob);
+ }
+ checkLoaded(blob);
+
+ storage_manager_->deleteBlockOrBlobFile(blob);
+ checkEvicted(blob);
+}
+
+} // namespace quickstep
+
+int main(int argc, char **argv) {
+ google::InitGoogleLogging(argv[0]);
+ // Honor FLAGS_buffer_pool_slots in StorageManager.
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ ::testing::InitGoogleTest(&argc, argv);
+
+ return RUN_ALL_TESTS();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 87a5e54..4da16ea 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -941,7 +941,8 @@ target_link_libraries(quickstep_storage_StorageManager
quickstep_utility_Alignment
quickstep_utility_CalculateInstalledMemory
quickstep_utility_Macros
- quickstep_utility_ShardedLockManager)
+ quickstep_utility_ShardedLockManager
+ tmb)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
target_link_libraries(quickstep_storage_StorageManager
quickstep_storage_FileManagerHdfs)
@@ -950,6 +951,12 @@ if (QUICKSTEP_HAVE_LIBNUMA)
target_link_libraries(quickstep_storage_StorageManager
${LIBNUMA_LIBRARY})
endif()
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_storage_StorageManager
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil)
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_storage_SubBlockTypeRegistry
glog
quickstep_storage_StorageBlockLayout_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 5d91052..15e2503 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -18,6 +18,7 @@
// This is included before other files so that we can conditionally determine
// what else to include.
#include "catalog/CatalogConfig.h"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED
#include "storage/StorageConfig.h"
// Define feature test macros to enable large page support for mmap.
@@ -52,6 +53,12 @@
#include <unordered_map>
#include <vector>
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#endif
+
#include "storage/CountedReference.hpp"
#include "storage/EvictionPolicy.hpp"
#include "storage/FileManagerLocal.hpp"
@@ -74,6 +81,13 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+#include "tmb/id_typedefs.h"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+#endif
+
using std::free;
using std::int32_t;
using std::memset;
@@ -81,6 +95,15 @@ using std::size_t;
using std::string;
using std::vector;
+#ifdef QUICKSTEP_DISTRIBUTED
+using std::malloc;
+using std::move;
+using std::unique_ptr;
+
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+#endif
+
namespace quickstep {
static bool ValidateBlockDomain(const char *flagname,
@@ -157,14 +180,21 @@ DEFINE_bool(use_hdfs, false, "Use HDFS as the persistent storage, instead of the
#endif
StorageManager::StorageManager(
- const std::string &storage_path,
- const block_id_domain block_domain,
- const size_t max_memory_usage,
- EvictionPolicy *eviction_policy)
+ const std::string &storage_path,
+ const block_id_domain block_domain,
+ const size_t max_memory_usage,
+ EvictionPolicy *eviction_policy,
+ const tmb::client_id block_locator_client_id,
+ tmb::MessageBus *bus)
: storage_path_(storage_path),
total_memory_usage_(0),
max_memory_usage_(max_memory_usage),
- eviction_policy_(eviction_policy) {
+ eviction_policy_(eviction_policy),
+#ifdef QUICKSTEP_DISTRIBUTED
+ block_domain_(block_domain),
+#endif
+ block_locator_client_id_(block_locator_client_id),
+ bus_(bus) {
#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
if (FLAGS_use_hdfs) {
file_manager_.reset(new FileManagerHdfs(storage_path));
@@ -175,10 +205,55 @@ StorageManager::StorageManager(
file_manager_.reset(new FileManagerLocal(storage_path));
#endif
+#ifdef QUICKSTEP_DISTRIBUTED
+ // NOTE(zuyu): The following if-condition is a workaround to bypass code for
+ // the distributed version in some unittests that does not use TMB. The
+ // end-to-end functional tests for the distributed version, however, would not
+ // be affected.
+ if (bus_) {
+ storage_manager_client_id_ = bus_->Connect();
+
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
+ bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
+
+ LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') starts with Domain " << block_domain;
+ }
+#endif
+
block_index_ = BlockIdUtil::GetBlockId(block_domain, file_manager_->getMaxUsedBlockCounter(block_domain));
}
StorageManager::~StorageManager() {
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ serialization::BlockDomainMessage proto;
+ proto.set_block_domain(block_domain_);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainUnregistrationMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent BlockDomainUnregistrationMessage (typed '" << kBlockDomainUnregistrationMessage
+ << "') to BlockLocator";
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ storage_manager_client_id_,
+ block_locator_client_id_,
+ move(message)));
+ }
+#endif
+
for (std::unordered_map<block_id, BlockHandle>::iterator it = blocks_.begin();
it != blocks_.end();
++it) {
@@ -222,6 +297,12 @@ block_id StorageManager::createBlock(const CatalogRelationSchema &relation,
// Make '*eviction_policy_' aware of the new block's existence.
eviction_policy_->blockCreated(new_block_id);
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage);
+ }
+#endif
+
return new_block_id;
}
@@ -249,6 +330,12 @@ block_id StorageManager::createBlob(const std::size_t num_slots,
// Make '*eviction_policy_' aware of the new blob's existence.
eviction_policy_->blockCreated(new_block_id);
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage);
+ }
+#endif
+
return new_block_id;
}
@@ -315,6 +402,12 @@ bool StorageManager::saveBlockOrBlob(const block_id block, const bool force) {
}
void StorageManager::evictBlockOrBlob(const block_id block) {
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ sendBlockLocationMessage(block, kDeleteBlockLocationMessage);
+ }
+#endif
+
BlockHandle handle;
{
SpinSharedMutexExclusiveLock<false> write_lock(blocks_shared_mutex_);
@@ -362,6 +455,87 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots,
return ++block_index_;
}
+#ifdef QUICKSTEP_DISTRIBUTED
+vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
+ serialization::BlockMessage proto;
+ proto.set_block_id(block);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kGetPeerDomainNetworkAddressesMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage
+ << "') to BlockLocator";
+
+ DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+ DCHECK(bus_ != nullptr);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ storage_manager_client_id_,
+ block_locator_client_id_,
+ move(message)));
+
+ const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ CHECK_EQ(block_locator_client_id_, annotated_message.sender);
+ CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
+ LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
+
+ serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
+ CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ vector<string> domain_network_addresses;
+ for (int i = 0; i < response_proto.domain_network_addresses_size(); ++i) {
+ domain_network_addresses.push_back(response_proto.domain_network_addresses(i));
+ }
+
+ return domain_network_addresses;
+}
+
+void StorageManager::sendBlockLocationMessage(const block_id block,
+ const tmb::message_type_id message_type) {
+ switch (message_type) {
+ case kAddBlockLocationMessage:
+ LOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+ break;
+ case kDeleteBlockLocationMessage:
+ LOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_;
+ break;
+ default:
+ LOG(FATAL) << "Unknown message type " << message_type;
+ }
+
+ serialization::BlockLocationMessage proto;
+ proto.set_block_id(block);
+ proto.set_block_domain(block_domain_);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ message_type);
+ free(proto_bytes);
+
+ LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_
+ << "') sent BlockLocationMessage (typed '" << message_type
+ << "') to BlockLocator";
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ storage_manager_client_id_,
+ block_locator_client_id_,
+ move(message)));
+}
+#endif
+
StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
const block_id block, const int numa_node) {
// The caller of this function holds an exclusive lock on this block/blob's
@@ -378,6 +552,12 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
loaded_handle.block_memory = block_buffer;
loaded_handle.block_memory_size = num_slots;
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ sendBlockLocationMessage(block, kAddBlockLocationMessage);
+ }
+#endif
+
return loaded_handle;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 52326c2..55a011e 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -20,11 +20,14 @@
#include <atomic>
#include <chrono>
+#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED
+
#include "storage/CountedReference.hpp"
#include "storage/EvictionPolicy.hpp"
#include "storage/FileManager.hpp"
@@ -40,6 +43,10 @@
#include "gflags/gflags.h"
#include "gtest/gtest_prod.h"
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
namespace quickstep {
DECLARE_int32(block_domain);
@@ -50,6 +57,7 @@ DECLARE_bool(use_hdfs);
#endif
class CatalogRelationSchema;
+
class StorageBlockLayout;
/** \addtogroup Storage
@@ -104,6 +112,33 @@ class StorageManager {
std::chrono::milliseconds(200))) {
}
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Constructor.
+ * @param storage_path The filesystem directory where blocks have persistent
+ * storage.
+ * @param block_domain The unique block domain.
+ * @param block_locator_client_id The TMB client ID of the block locator.
+ * @param bus A pointer to the TMB.
+ *
+ * @exception CorruptPersistentStorage The storage directory layout is not
+ * in the expected format.
+ **/
+ StorageManager(const std::string &storage_path,
+ const block_id_domain block_domain,
+ const tmb::client_id block_locator_client_id,
+ tmb::MessageBus *bus)
+ : StorageManager(storage_path,
+ block_domain,
+ FLAGS_buffer_pool_slots,
+ LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
+ 2,
+ std::chrono::milliseconds(200)),
+ block_locator_client_id,
+ bus) {
+ }
+#endif
+
/**
* @brief Constructor.
* @param storage_path The filesystem directory where blocks have persistent
@@ -121,13 +156,18 @@ class StorageManager {
* @param eviction_policy The eviction policy that the storage manager should
* use to manage the cache. The storage manager takes
* ownership of *eviction_policy.
+ * @param block_locator_client_id The TMB client ID of the block locator.
+ * @param bus A pointer to the TMB.
+ *
* @exception CorruptPersistentStorage The storage directory layout is not
* in the expected format.
**/
StorageManager(const std::string &storage_path,
const block_id_domain block_domain,
const size_t max_memory_usage,
- EvictionPolicy *eviction_policy);
+ EvictionPolicy *eviction_policy,
+ const tmb::client_id block_locator_client_id = tmb::kClientIdNone,
+ tmb::MessageBus *bus = nullptr);
/**
* @brief Destructor which also destroys all managed blocks.
@@ -332,6 +372,27 @@ class StorageManager {
StorageBlockBase *block;
};
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Get the network info of all the remote StorageManagers which may
+ * load the given block in the buffer pool.
+ *
+ * @param block The block or blob to pull.
+ *
+ * @return The network info of all the possible peers to pull.
+ **/
+ std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block);
+
+ /**
+ * @brief Update the block location info in BlockLocator.
+ *
+ * @param block The given block or blob.
+ * @param message_type Indicate whether to add or delete a block location.
+ **/
+ void sendBlockLocationMessage(const block_id block,
+ const tmb::message_type_id message_type);
+#endif
+
// Helper for createBlock() and createBlob(). Allocates a block ID and memory
// slots for a new StorageBlock or StorageBlob. Returns the allocated ID and
// writes the allocated slot range into 'handle->slot_index_low' and
@@ -459,6 +520,15 @@ class StorageManager {
std::unique_ptr<EvictionPolicy> eviction_policy_;
+#ifdef QUICKSTEP_DISTRIBUTED
+ const block_id_domain block_domain_;
+
+ tmb::client_id storage_manager_client_id_;
+#endif
+
+ const tmb::client_id block_locator_client_id_;
+ tmb::MessageBus *bus_;
+
std::unique_ptr<FileManager> file_manager_;
// Used to generate unique IDs in allocateNewBlockOrBlob().
@@ -486,6 +556,10 @@ class StorageManager {
static constexpr std::size_t kLockManagerNumShards = 0x2000-1;
ShardedLockManager<block_id, kLockManagerNumShards, SpinSharedMutex<false>> lock_manager_;
+ friend class BlockLocatorTest;
+ FRIEND_TEST(BlockLocatorTest, BlockTest);
+ FRIEND_TEST(BlockLocatorTest, BlobTest);
+
FRIEND_TEST(StorageManagerTest, DifferentNUMANodeBlobTestWithEviction);
FRIEND_TEST(StorageManagerTest, EvictFromSameShardTest);