You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:00 UTC
[08/50] incubator-quickstep git commit: Use BlockLocator and
DataExchangerAsync in the distributed tests.
Use BlockLocator and DataExchangerAsync in the distributed tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/787a3251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/787a3251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/787a3251
Branch: refs/heads/quickstep_partition_parser_support
Commit: 787a3251019162610ebe13efbd341b3f9ac7a268
Parents: 3093e74
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Nov 4 23:12:09 2016 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Nov 14 20:38:04 2016 -0800
----------------------------------------------------------------------
query_optimizer/tests/CMakeLists.txt | 8 ++
.../DistributedExecutionGeneratorTestRunner.cpp | 104 ++++++++++++++++---
.../DistributedExecutionGeneratorTestRunner.hpp | 32 +++++-
query_optimizer/tests/TestDatabaseLoader.hpp | 59 +++++++++--
storage/DataExchangerAsync.cpp | 4 +-
5 files changed, 178 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index ac4548a..9c764e4 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -79,6 +79,10 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
quickstep_types_containers_Tuple
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
+ quickstep_storage_StorageBlockInfo)
+endif(ENABLE_DISTRIBUTED)
if (ENABLE_DISTRIBUTED)
add_executable(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
@@ -110,7 +114,9 @@ if (ENABLE_DISTRIBUTED)
quickstep_cli_PrintToScreen
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_BlockLocator
quickstep_queryexecution_ForemanDistributed
+ quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_Shiftboss
@@ -120,6 +126,8 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryoptimizer_OptimizerContext
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_tests_TestDatabaseLoader
+ quickstep_storage_DataExchangerAsync
+ quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
quickstep_utility_MemStream
quickstep_utility_SqlError
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 5cccc21..0403e77 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -20,18 +20,26 @@
#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
#include <cstdio>
+#include <cstdlib>
+#include <memory>
#include <set>
#include <string>
+#include <utility>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "cli/DropRelation.hpp"
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
+#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
#include "query_optimizer/OptimizerContext.hpp"
#include "query_optimizer/QueryHandle.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/MemStream.hpp"
#include "utility/SqlError.hpp"
@@ -41,10 +49,15 @@
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
-using std::string;
+using std::free;
using std::make_unique;
+using std::malloc;
+using std::move;
+using std::string;
using std::vector;
+using tmb::TaggedMessage;
+
namespace quickstep {
class CatalogRelation;
@@ -56,10 +69,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption =
DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path)
: query_id_(0),
- test_database_loader_(storage_path) {
- test_database_loader_.createTestRelation(false /* allow_vchar */);
- test_database_loader_.loadTestRelation();
-
+ data_exchangers_(kNumInstances) {
bus_.Initialize();
cli_id_ = bus_.Connect();
@@ -67,9 +77,27 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+ bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+ block_locator_ = make_unique<BlockLocator>(&bus_);
+ locator_client_id_ = block_locator_->getBusClientID();
+ block_locator_->start();
+
+ test_database_loader_ = make_unique<TestDatabaseLoader>(
+ storage_path,
+ getBlockDomain(test_database_loader_data_exchanger_.network_address()),
+ locator_client_id_,
+ &bus_);
+ test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager());
+ test_database_loader_data_exchanger_.start();
+
+ test_database_loader_->createTestRelation(false /* allow_vchar */);
+ test_database_loader_->loadTestRelation();
+
// NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
// could receive a registration message from the latter.
- foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_.catalog_database());
+ foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_->catalog_database());
// We don't use the NUMA aware version of worker code.
const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
@@ -78,17 +106,24 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
for (int i = 0; i < kNumInstances; ++i) {
workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, &bus_));
- const vector<tmb::client_id> worker_client_ids(1, workers_[i]->getBusClientID());
+ const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID());
worker_directories_.push_back(
make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes));
+ auto storage_manager = make_unique<StorageManager>(
+ storage_path, getBlockDomain(data_exchangers_[i].network_address()), locator_client_id_, &bus_);
+
+ data_exchangers_[i].set_storage_manager(storage_manager.get());
shiftbosses_.push_back(
- make_unique<Shiftboss>(&bus_, test_database_loader_.storage_manager(), worker_directories_[i].get()));
+ make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get()));
+
+ storage_managers_.push_back(move(storage_manager));
}
foreman_->start();
for (int i = 0; i < kNumInstances; ++i) {
+ data_exchangers_[i].start();
shiftbosses_[i]->start();
workers_[i]->start();
}
@@ -101,9 +136,9 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
VLOG(4) << "Test SQL(s): " << input;
if (options.find(kResetOption) != options.end()) {
- test_database_loader_.clear();
- test_database_loader_.createTestRelation(false /* allow_vchar */);
- test_database_loader_.loadTestRelation();
+ test_database_loader_->clear();
+ test_database_loader_->createTestRelation(false /* allow_vchar */);
+ test_database_loader_->loadTestRelation();
}
MemStream output_stream;
@@ -125,7 +160,7 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
QueryHandle query_handle(query_id_++, cli_id_);
optimizer_.generateQueryHandle(parse_statement,
- test_database_loader_.catalog_database(),
+ test_database_loader_->catalog_database(),
&optimizer_context,
&query_handle);
@@ -141,11 +176,11 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
if (query_result_relation) {
PrintToScreen::PrintRelation(*query_result_relation,
- test_database_loader_.storage_manager(),
+ test_database_loader_->storage_manager(),
output_stream.file());
DropRelation::Drop(*query_result_relation,
- test_database_loader_.catalog_database(),
- test_database_loader_.storage_manager());
+ test_database_loader_->catalog_database(),
+ test_database_loader_->storage_manager());
}
} catch (const SqlError &error) {
*output = error.formatMessage(input);
@@ -158,5 +193,44 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
}
}
+block_id_domain DistributedExecutionGeneratorTestRunner::getBlockDomain(
+ const string &network_address) {
+ serialization::BlockDomainRegistrationMessage proto;
+ proto.set_domain_network_address(network_address);
+
+ const int proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kBlockDomainRegistrationMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "Client (id '" << cli_id_
+ << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+ << "') to BlockLocator (id '" << locator_client_id_ << "')";
+
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ cli_id_,
+ locator_client_id_,
+ move(message)));
+
+ const tmb::AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ CHECK_EQ(locator_client_id_, annotated_message.sender);
+ CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+ DLOG(INFO) << "Client (id '" << cli_id_
+ << "') received BlockDomainRegistrationResponseMessage (typed '"
+ << kBlockDomainRegistrationResponseMessage
+ << "') from BlockLocator";
+
+ serialization::BlockDomainMessage response_proto;
+ CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ return static_cast<block_id_domain>(response_proto.block_domain());
+}
+
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index ab10841..d2b13e4 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -28,6 +28,7 @@
#include <vector>
#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
@@ -36,6 +37,8 @@
#include "query_execution/WorkerDirectory.hpp"
#include "query_optimizer/Optimizer.hpp"
#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
#include "utility/textbased_test/TextBasedTestRunner.hpp"
@@ -86,6 +89,25 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
}
foreman_->join();
+
+ test_database_loader_data_exchanger_.shutdown();
+ test_database_loader_.reset();
+ for (int i = 0; i < kNumInstances; ++i) {
+ data_exchangers_[i].shutdown();
+ storage_managers_[i].reset();
+ }
+
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_,
+ cli_id_,
+ locator_client_id_,
+ tmb::TaggedMessage(quickstep::kPoisonMessage)));
+
+ test_database_loader_data_exchanger_.join();
+ for (int i = 0; i < kNumInstances; ++i) {
+ data_exchangers_[i].join();
+ }
+ block_locator_->join();
}
void runTestCase(const std::string &input,
@@ -93,20 +115,26 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
std::string *output) override;
private:
+ block_id_domain getBlockDomain(const std::string &network_address);
+
std::size_t query_id_;
SqlParserWrapper sql_parser_;
- TestDatabaseLoader test_database_loader_;
+ std::unique_ptr<TestDatabaseLoader> test_database_loader_;
+ DataExchangerAsync test_database_loader_data_exchanger_;
Optimizer optimizer_;
MessageBusImpl bus_;
+ tmb::client_id cli_id_, locator_client_id_;
- tmb::client_id cli_id_;
+ std::unique_ptr<BlockLocator> block_locator_;
std::unique_ptr<ForemanDistributed> foreman_;
std::vector<std::unique_ptr<Worker>> workers_;
std::vector<std::unique_ptr<WorkerDirectory>> worker_directories_;
+ std::vector<DataExchangerAsync> data_exchangers_;
+ std::vector<std::unique_ptr<StorageManager>> storage_managers_;
std::vector<std::unique_ptr<Shiftboss>> shiftbosses_;
DISALLOW_COPY_AND_ASSIGN(DistributedExecutionGeneratorTestRunner);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/TestDatabaseLoader.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/TestDatabaseLoader.hpp b/query_optimizer/tests/TestDatabaseLoader.hpp
index d49719d..87c19c6 100644
--- a/query_optimizer/tests/TestDatabaseLoader.hpp
+++ b/query_optimizer/tests/TestDatabaseLoader.hpp
@@ -24,12 +24,21 @@
#include "catalog/CatalogDatabase.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/StorageBlockInfo.hpp"
+#endif // QUICKSTEP_DISTRIBUTED
+
#include "storage/StorageManager.hpp"
#include "threading/ThreadIDBasedMap.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
+#ifdef QUICKSTEP_DISTRIBUTED
+namespace tmb { class MessageBus; }
+#endif // QUICKSTEP_DISTRIBUTED
+
namespace quickstep {
class CatalogRelation;
@@ -60,18 +69,34 @@ class TestDatabaseLoader {
0 /* id */),
storage_manager_(storage_path),
test_relation_(nullptr) {
- bus_.Initialize();
-
- const tmb::client_id worker_thread_client_id = bus_.Connect();
- bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage);
-
- // Refer to InsertDestination::sendBlockFilledMessage for the rationale
- // behind using ClientIDMap.
- thread_id_map_->addValue(worker_thread_client_id);
+ init();
+ }
- scheduler_client_id_ = bus_.Connect();
- bus_.RegisterClientAsReceiver(scheduler_client_id_, kCatalogRelationNewBlockMessage);
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Constructor for the distributed version.
+ *
+ * @param storage_path A filesystem directory where the blocks may be
+ * evicted to during the execution of a test query.
+ * Can be empty if the test query is not executed
+ * in the query engine.
+ * @param block_domain The block_domain for StorageManager.
+ * @param locator_client_id The client id of BlockLocator for StorageManager.
+ * @param bus_global The Bus for StorageManager.
+ */
+ TestDatabaseLoader(const std::string &storage_path,
+ const block_id_domain block_domain,
+ const tmb::client_id locator_client_id,
+ tmb::MessageBus *bus_global)
+ : thread_id_map_(ClientIDMap::Instance()),
+ catalog_database_(nullptr /* parent */,
+ "TestDatabase" /* name */,
+ 0 /* id */),
+ storage_manager_(storage_path, block_domain, locator_client_id, bus_global),
+ test_relation_(nullptr) {
+ init();
}
+#endif // QUICKSTEP_DISTRIBUTED
~TestDatabaseLoader() {
clear();
@@ -139,6 +164,20 @@ class TestDatabaseLoader {
void clear();
private:
+ void init() {
+ bus_.Initialize();
+
+ const tmb::client_id worker_thread_client_id = bus_.Connect();
+ bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage);
+
+ // Refer to InsertDestination::sendBlockFilledMessage for the rationale
+ // behind using ClientIDMap.
+ thread_id_map_->addValue(worker_thread_client_id);
+
+ scheduler_client_id_ = bus_.Connect();
+ bus_.RegisterClientAsReceiver(scheduler_client_id_, kCatalogRelationNewBlockMessage);
+ }
+
/**
* @brief Simulate Foreman to add all new blocks to the relation.
*/
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/storage/DataExchangerAsync.cpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.cpp b/storage/DataExchangerAsync.cpp
index 59f5ebf..1d2f7db 100644
--- a/storage/DataExchangerAsync.cpp
+++ b/storage/DataExchangerAsync.cpp
@@ -155,11 +155,11 @@ void DataExchangerAsync::run() {
if (ok) {
call_context->Proceed();
} else {
- LOG(WARNING) << "Not ok\n";
+ LOG(WARNING) << "DataExchangerAsync " << server_address_ << " is not ok";
delete call_context;
}
} else {
- LOG(INFO) << "Shutdown\n";
+ LOG(INFO) << "DataExchangerAsync " << server_address_ << " shuts down";
return;
}
}