You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:14 UTC
[22/50] incubator-quickstep git commit: Added an util to get block
domain from BlockLocator.
Added an util to get block domain from BlockLocator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/56555117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/56555117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/56555117
Branch: refs/heads/quickstep_partition_parser_support
Commit: 565551173e797c65651eef0408c74b55e27a5796
Parents: f7d1543
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 20 15:17:46 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 20 19:58:14 2016 -0800
----------------------------------------------------------------------
query_execution/BlockLocatorUtil.cpp | 93 ++++++++++++++++++++
query_execution/BlockLocatorUtil.hpp | 59 +++++++++++++
query_execution/CMakeLists.txt | 9 ++
query_execution/tests/BlockLocator_unittest.cpp | 42 +--------
query_optimizer/tests/CMakeLists.txt | 4 +-
.../DistributedExecutionGeneratorTestRunner.cpp | 54 +++---------
.../DistributedExecutionGeneratorTestRunner.hpp | 4 +-
storage/CMakeLists.txt | 4 +-
storage/tests/DataExchange_unittest.cpp | 50 ++---------
9 files changed, 187 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/BlockLocatorUtil.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocatorUtil.cpp b/query_execution/BlockLocatorUtil.cpp
new file mode 100644
index 0000000..d2d1e96
--- /dev/null
+++ b/query_execution/BlockLocatorUtil.cpp
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_execution/BlockLocatorUtil.hpp"
+
+#include <cstdlib>
+#include <string>
+#include <utility>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using tmb::TaggedMessage;
+using tmb::MessageBus;
+using tmb::client_id;
+
+namespace quickstep {
+namespace block_locator {
+
+namespace S = ::quickstep::serialization;
+
+block_id_domain getBlockDomain(const std::string &network_address,
+ const client_id cli_id,
+ client_id *locator_client_id,
+ MessageBus *bus) {
+ tmb::Address address;
+ address.All(true);
+ // NOTE(zuyu): The singleton BlockLocator would need only one copy of the message.
+ tmb::MessageStyle style;
+
+ S::BlockDomainRegistrationMessage proto;
+ proto.set_domain_network_address(network_address);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainRegistrationMessage);
+ std::free(proto_bytes);
+
+ DLOG(INFO) << "Client (id '" << cli_id
+ << "') broadcasts BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+ << "') to BlockLocator.";
+
+ CHECK(MessageBus::SendStatus::kOK ==
+ bus->Send(cli_id, address, style, std::move(message)));
+
+ const tmb::AnnotatedMessage annotated_message(bus->Receive(cli_id, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+
+ *locator_client_id = annotated_message.sender;
+
+ DLOG(INFO) << "Client (id '" << cli_id
+ << "') received BlockDomainRegistrationResponseMessage (typed '"
+ << kBlockDomainRegistrationResponseMessage
+ << "') from BlockLocator (id '" << *locator_client_id << "').";
+
+ S::BlockDomainMessage response_proto;
+ CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ return static_cast<block_id_domain>(response_proto.block_domain());
+}
+
+} // namespace block_locator
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/BlockLocatorUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocatorUtil.hpp b/query_execution/BlockLocatorUtil.hpp
new file mode 100644
index 0000000..74f65e4
--- /dev/null
+++ b/query_execution/BlockLocatorUtil.hpp
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_
+
+#include <string>
+
+#include "storage/StorageBlockInfo.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+namespace block_locator {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief Broadcast to find BlockLocator to get a block domain for
+ * StorageManager with the given network address.
+ *
+ * @param network_address The network address of the StorageManager.
+ * @param cli_id The client ID of the block domain requester.
+ * @param locator_client_id The client ID of BlockLocator to set.
+ * @param bus A pointer to the TMB.
+ *
+ * @return The requested block domain.
+ **/
+block_id_domain getBlockDomain(const std::string &network_address,
+ const tmb::client_id cli_id,
+ tmb::client_id *locator_client_id,
+ tmb::MessageBus *bus);
+
+/** @} */
+
+} // namespace block_locator
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index eec0029..719d9f3 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -31,6 +31,7 @@ endif()
add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
+ add_library(quickstep_queryexecution_BlockLocatorUtil BlockLocatorUtil.cpp BlockLocatorUtil.hpp)
endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
if (ENABLE_DISTRIBUTED)
@@ -83,6 +84,12 @@ if (ENABLE_DISTRIBUTED)
quickstep_threading_ThreadUtil
quickstep_utility_Macros
tmb)
+ target_link_libraries(quickstep_queryexecution_BlockLocatorUtil
+ glog
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_storage_StorageBlockInfo
+ tmb)
endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ForemanBase
glog
@@ -345,6 +352,7 @@ target_link_libraries(quickstep_queryexecution
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_BlockLocatorUtil
quickstep_queryexecution_ForemanDistributed
quickstep_queryexecution_PolicyEnforcerDistributed
quickstep_queryexecution_QueryManagerDistributed
@@ -363,6 +371,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogRelation
quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_BlockLocatorUtil
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index 465f2a3..32437c3 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -26,6 +26,7 @@
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogRelation.hpp"
#include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
@@ -71,7 +72,6 @@ class BlockLocatorTest : public ::testing::Test {
bus_.Initialize();
locator_.reset(new BlockLocator(&bus_));
- locator_client_id_ = locator_->getBusClientID();
locator_->start();
worker_client_id_ = bus_.Connect();
@@ -84,7 +84,9 @@ class BlockLocatorTest : public ::testing::Test {
bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
- block_domain_ = getBlockDomain(kDomainNetworkAddress);
+ block_domain_ =
+ block_locator::getBlockDomain(kDomainNetworkAddress, worker_client_id_, &locator_client_id_, &bus_);
+ DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
storage_manager_.reset(
new StorageManager(kStoragePath, block_domain_, locator_client_id_, &bus_));
@@ -168,42 +170,6 @@ class BlockLocatorTest : public ::testing::Test {
unique_ptr<StorageManager> storage_manager_;
private:
- block_id_domain getBlockDomain(const string &network_address) {
- serialization::BlockDomainRegistrationMessage proto;
- proto.set_domain_network_address(network_address);
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kBlockDomainRegistrationMessage);
- free(proto_bytes);
-
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
- << "') to BlockLocator (id '" << locator_client_id_ << "')";
-
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_,
- worker_client_id_,
- locator_client_id_,
- move(message)));
-
- const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
- EXPECT_EQ(locator_client_id_, annotated_message.sender);
- EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') received BlockDomainRegistrationResponseMessage from BlockLocator";
-
- serialization::BlockDomainMessage response_proto;
- CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- return static_cast<block_id_domain>(response_proto.block_domain());
- }
-
MessageBusImpl bus_;
unique_ptr<BlockLocator> locator_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 9c764e4..b987322 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -115,8 +115,8 @@ if (ENABLE_DISTRIBUTED)
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_BlockLocatorUtil
quickstep_queryexecution_ForemanDistributed
- quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_Shiftboss
@@ -127,7 +127,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_tests_TestDatabaseLoader
quickstep_storage_DataExchangerAsync
- quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
quickstep_utility_Macros
quickstep_utility_MemStream
quickstep_utility_SqlError
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 0403e77..2351dcd 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -32,14 +32,14 @@
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/ForemanDistributed.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_optimizer/OptimizerContext.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "storage/DataExchangerAsync.hpp"
-#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
#include "utility/MemStream.hpp"
#include "utility/SqlError.hpp"
@@ -81,14 +81,15 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
block_locator_ = make_unique<BlockLocator>(&bus_);
- locator_client_id_ = block_locator_->getBusClientID();
block_locator_->start();
test_database_loader_ = make_unique<TestDatabaseLoader>(
storage_path,
- getBlockDomain(test_database_loader_data_exchanger_.network_address()),
+ block_locator::getBlockDomain(
+ test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
locator_client_id_,
&bus_);
+ DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager());
test_database_loader_data_exchanger_.start();
@@ -111,7 +112,11 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes));
auto storage_manager = make_unique<StorageManager>(
- storage_path, getBlockDomain(data_exchangers_[i].network_address()), locator_client_id_, &bus_);
+ storage_path,
+ block_locator::getBlockDomain(
+ data_exchangers_[i].network_address(), cli_id_, &locator_client_id_, &bus_),
+ locator_client_id_, &bus_);
+ DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
data_exchangers_[i].set_storage_manager(storage_manager.get());
shiftbosses_.push_back(
@@ -193,44 +198,5 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
}
}
-block_id_domain DistributedExecutionGeneratorTestRunner::getBlockDomain(
- const string &network_address) {
- serialization::BlockDomainRegistrationMessage proto;
- proto.set_domain_network_address(network_address);
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kBlockDomainRegistrationMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "Client (id '" << cli_id_
- << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
- << "') to BlockLocator (id '" << locator_client_id_ << "')";
-
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_,
- cli_id_,
- locator_client_id_,
- move(message)));
-
- const tmb::AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
- CHECK_EQ(locator_client_id_, annotated_message.sender);
- CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
- DLOG(INFO) << "Client (id '" << cli_id_
- << "') received BlockDomainRegistrationResponseMessage (typed '"
- << kBlockDomainRegistrationResponseMessage
- << "') from BlockLocator";
-
- serialization::BlockDomainMessage response_proto;
- CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- return static_cast<block_id_domain>(response_proto.block_domain());
-}
-
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index d2b13e4..63e320d 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -38,7 +38,7 @@
#include "query_optimizer/Optimizer.hpp"
#include "query_optimizer/tests/TestDatabaseLoader.hpp"
#include "storage/DataExchangerAsync.hpp"
-#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
#include "utility/Macros.hpp"
#include "utility/textbased_test/TextBasedTestRunner.hpp"
@@ -115,8 +115,6 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
std::string *output) override;
private:
- block_id_domain getBlockDomain(const std::string &network_address);
-
std::size_t query_id_;
SqlParserWrapper sql_parser_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index be60662..559d86d 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -862,7 +862,7 @@ target_link_libraries(quickstep_storage_PartitionedHashTablePool
quickstep_storage_FastHashTableFactory
quickstep_storage_HashTableBase
quickstep_utility_Macros
- quickstep_utility_StringUtil)
+ quickstep_utility_StringUtil)
target_link_libraries(quickstep_storage_PreloaderThread
glog
quickstep_catalog_CatalogDatabase
@@ -1502,7 +1502,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_BlockLocator
- quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_BlockLocatorUtil
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
quickstep_storage_CountedReference
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
index 9c75150..ac39728 100644
--- a/storage/tests/DataExchange_unittest.cpp
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -27,7 +27,7 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/BlockLocator.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "storage/CountedReference.hpp"
@@ -77,7 +77,6 @@ class DataExchangeTest : public ::testing::Test {
bus_.Initialize();
locator_.reset(new BlockLocator(&bus_));
- locator_client_id_ = locator_->getBusClientID();
locator_->start();
worker_client_id_ = bus_.Connect();
@@ -88,18 +87,22 @@ class DataExchangeTest : public ::testing::Test {
storage_manager_expected_.reset(new StorageManager(
kStoragePath,
- getBlockDomain(data_exchanger_expected_.network_address()),
+ block_locator::getBlockDomain(
+ data_exchanger_expected_.network_address(), worker_client_id_, &locator_client_id_, &bus_),
locator_client_id_,
&bus_));
+ DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
data_exchanger_expected_.set_storage_manager(storage_manager_expected_.get());
data_exchanger_expected_.start();
storage_manager_checked_.reset(new StorageManager(
kStoragePath,
- getBlockDomain(kCheckedDomainNetworkAddress),
+ block_locator::getBlockDomain(
+ kCheckedDomainNetworkAddress, worker_client_id_, &locator_client_id_, &bus_),
locator_client_id_,
&bus_));
+ DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
}
virtual void TearDown() {
@@ -123,45 +126,6 @@ class DataExchangeTest : public ::testing::Test {
unique_ptr<StorageManager> storage_manager_expected_, storage_manager_checked_;
private:
- block_id_domain getBlockDomain(const string &network_address) {
- serialization::BlockDomainRegistrationMessage proto;
- proto.set_domain_network_address(network_address);
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kBlockDomainRegistrationMessage);
- free(proto_bytes);
-
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
- << "') to BlockLocator (id '" << locator_client_id_ << "')";
-
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_,
- worker_client_id_,
- locator_client_id_,
- move(message)));
-
- const tmb::AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true));
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
- EXPECT_EQ(locator_client_id_, annotated_message.sender);
- EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
-
- LOG(INFO) << "Worker (id '" << worker_client_id_
- << "') received BlockDomainRegistrationResponseMessage (typed '"
- << kBlockDomainRegistrationResponseMessage
- << "') from BlockLocator";
-
- serialization::BlockDomainMessage response_proto;
- CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- return static_cast<block_id_domain>(response_proto.block_domain());
- }
-
MessageBusImpl bus_;
unique_ptr<BlockLocator> locator_;