You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:00 UTC

[08/50] incubator-quickstep git commit: Use BlockLocator and DataExchangerAsync in the distributed tests.

Use BlockLocator and DataExchangerAsync in the distributed tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/787a3251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/787a3251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/787a3251

Branch: refs/heads/quickstep_partition_parser_support
Commit: 787a3251019162610ebe13efbd341b3f9ac7a268
Parents: 3093e74
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Nov 4 23:12:09 2016 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Nov 14 20:38:04 2016 -0800

----------------------------------------------------------------------
 query_optimizer/tests/CMakeLists.txt            |   8 ++
 .../DistributedExecutionGeneratorTestRunner.cpp | 104 ++++++++++++++++---
 .../DistributedExecutionGeneratorTestRunner.hpp |  32 +++++-
 query_optimizer/tests/TestDatabaseLoader.hpp    |  59 +++++++++--
 storage/DataExchangerAsync.cpp                  |   4 +-
 5 files changed, 178 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index ac4548a..9c764e4 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -79,6 +79,10 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
                       quickstep_types_containers_Tuple
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
+                        quickstep_storage_StorageBlockInfo)
+endif(ENABLE_DISTRIBUTED)
 
 if (ENABLE_DISTRIBUTED)
   add_executable(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
@@ -110,7 +114,9 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_cli_PrintToScreen
                         quickstep_parser_ParseStatement
                         quickstep_parser_SqlParserWrapper
+                        quickstep_queryexecution_BlockLocator
                         quickstep_queryexecution_ForemanDistributed
+                        quickstep_queryexecution_QueryExecutionMessages_proto
                         quickstep_queryexecution_QueryExecutionTypedefs
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_Shiftboss
@@ -120,6 +126,8 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryoptimizer_OptimizerContext
                         quickstep_queryoptimizer_QueryHandle
                         quickstep_queryoptimizer_tests_TestDatabaseLoader
+                        quickstep_storage_DataExchangerAsync
+                        quickstep_storage_StorageBlockInfo
                         quickstep_utility_Macros
                         quickstep_utility_MemStream
                         quickstep_utility_SqlError

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 5cccc21..0403e77 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -20,18 +20,26 @@
 #include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
 
 #include <cstdio>
+#include <cstdlib>
+#include <memory>
 #include <set>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "cli/DropRelation.hpp"
 #include "cli/PrintToScreen.hpp"
 #include "parser/ParseStatement.hpp"
+#include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
 #include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/QueryHandle.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/MemStream.hpp"
 #include "utility/SqlError.hpp"
 
@@ -41,10 +49,15 @@
 #include "tmb/message_bus.h"
 #include "tmb/tagged_message.h"
 
-using std::string;
+using std::free;
 using std::make_unique;
+using std::malloc;
+using std::move;
+using std::string;
 using std::vector;
 
+using tmb::TaggedMessage;
+
 namespace quickstep {
 
 class CatalogRelation;
@@ -56,10 +69,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption =
 
 DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path)
     : query_id_(0),
-      test_database_loader_(storage_path) {
-  test_database_loader_.createTestRelation(false /* allow_vchar */);
-  test_database_loader_.loadTestRelation();
-
+      data_exchangers_(kNumInstances) {
   bus_.Initialize();
 
   cli_id_ = bus_.Connect();
@@ -67,9 +77,27 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
   bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
 
+  bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+  block_locator_ = make_unique<BlockLocator>(&bus_);
+  locator_client_id_ = block_locator_->getBusClientID();
+  block_locator_->start();
+
+  test_database_loader_ = make_unique<TestDatabaseLoader>(
+      storage_path,
+      getBlockDomain(test_database_loader_data_exchanger_.network_address()),
+      locator_client_id_,
+      &bus_);
+  test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager());
+  test_database_loader_data_exchanger_.start();
+
+  test_database_loader_->createTestRelation(false /* allow_vchar */);
+  test_database_loader_->loadTestRelation();
+
   // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
   // could receive a registration message from the latter.
-  foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_.catalog_database());
+  foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_->catalog_database());
 
   // We don't use the NUMA aware version of worker code.
   const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
@@ -78,17 +106,24 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
   for (int i = 0; i < kNumInstances; ++i) {
     workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, &bus_));
 
-    const vector<tmb::client_id> worker_client_ids(1, workers_[i]->getBusClientID());
+    const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID());
     worker_directories_.push_back(
         make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes));
 
+    auto storage_manager = make_unique<StorageManager>(
+        storage_path, getBlockDomain(data_exchangers_[i].network_address()), locator_client_id_, &bus_);
+
+    data_exchangers_[i].set_storage_manager(storage_manager.get());
     shiftbosses_.push_back(
-        make_unique<Shiftboss>(&bus_, test_database_loader_.storage_manager(), worker_directories_[i].get()));
+        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get()));
+
+    storage_managers_.push_back(move(storage_manager));
   }
 
   foreman_->start();
 
   for (int i = 0; i < kNumInstances; ++i) {
+    data_exchangers_[i].start();
     shiftbosses_[i]->start();
     workers_[i]->start();
   }
@@ -101,9 +136,9 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
   VLOG(4) << "Test SQL(s): " << input;
 
   if (options.find(kResetOption) != options.end()) {
-    test_database_loader_.clear();
-    test_database_loader_.createTestRelation(false /* allow_vchar */);
-    test_database_loader_.loadTestRelation();
+    test_database_loader_->clear();
+    test_database_loader_->createTestRelation(false /* allow_vchar */);
+    test_database_loader_->loadTestRelation();
   }
 
   MemStream output_stream;
@@ -125,7 +160,7 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
       QueryHandle query_handle(query_id_++, cli_id_);
 
       optimizer_.generateQueryHandle(parse_statement,
-                                     test_database_loader_.catalog_database(),
+                                     test_database_loader_->catalog_database(),
                                      &optimizer_context,
                                      &query_handle);
 
@@ -141,11 +176,11 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
       const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
       if (query_result_relation) {
           PrintToScreen::PrintRelation(*query_result_relation,
-                                       test_database_loader_.storage_manager(),
+                                       test_database_loader_->storage_manager(),
                                        output_stream.file());
           DropRelation::Drop(*query_result_relation,
-                             test_database_loader_.catalog_database(),
-                             test_database_loader_.storage_manager());
+                             test_database_loader_->catalog_database(),
+                             test_database_loader_->storage_manager());
       }
     } catch (const SqlError &error) {
       *output = error.formatMessage(input);
@@ -158,5 +193,44 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
   }
 }
 
+block_id_domain DistributedExecutionGeneratorTestRunner::getBlockDomain(
+    const string &network_address) {
+  serialization::BlockDomainRegistrationMessage proto;
+  proto.set_domain_network_address(network_address);
+
+  const int proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kBlockDomainRegistrationMessage);
+  free(proto_bytes);
+
+  DLOG(INFO) << "Client (id '" << cli_id_
+             << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage
+             << "') to BlockLocator (id '" << locator_client_id_ << "')";
+
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(&bus_,
+                                         cli_id_,
+                                         locator_client_id_,
+                                         move(message)));
+
+  const tmb::AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
+  const TaggedMessage &tagged_message = annotated_message.tagged_message;
+  CHECK_EQ(locator_client_id_, annotated_message.sender);
+  CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type());
+  DLOG(INFO) << "Client (id '" << cli_id_
+             << "') received BlockDomainRegistrationResponseMessage (typed '"
+             << kBlockDomainRegistrationResponseMessage
+             << "') from BlockLocator";
+
+  serialization::BlockDomainMessage response_proto;
+  CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+  return static_cast<block_id_domain>(response_proto.block_domain());
+}
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index ab10841..d2b13e4 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -28,6 +28,7 @@
 #include <vector>
 
 #include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
@@ -36,6 +37,8 @@
 #include "query_execution/WorkerDirectory.hpp"
 #include "query_optimizer/Optimizer.hpp"
 #include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 #include "utility/textbased_test/TextBasedTestRunner.hpp"
 
@@ -86,6 +89,25 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
     }
 
     foreman_->join();
+
+    test_database_loader_data_exchanger_.shutdown();
+    test_database_loader_.reset();
+    for (int i = 0; i < kNumInstances; ++i) {
+      data_exchangers_[i].shutdown();
+      storage_managers_[i].reset();
+    }
+
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           cli_id_,
+                                           locator_client_id_,
+                                           tmb::TaggedMessage(quickstep::kPoisonMessage)));
+
+    test_database_loader_data_exchanger_.join();
+    for (int i = 0; i < kNumInstances; ++i) {
+      data_exchangers_[i].join();
+    }
+    block_locator_->join();
   }
 
   void runTestCase(const std::string &input,
@@ -93,20 +115,26 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
                    std::string *output) override;
 
  private:
+  block_id_domain getBlockDomain(const std::string &network_address);
+
   std::size_t query_id_;
 
   SqlParserWrapper sql_parser_;
-  TestDatabaseLoader test_database_loader_;
+  std::unique_ptr<TestDatabaseLoader> test_database_loader_;
+  DataExchangerAsync test_database_loader_data_exchanger_;
   Optimizer optimizer_;
 
   MessageBusImpl bus_;
+  tmb::client_id cli_id_, locator_client_id_;
 
-  tmb::client_id cli_id_;
+  std::unique_ptr<BlockLocator> block_locator_;
 
   std::unique_ptr<ForemanDistributed> foreman_;
 
   std::vector<std::unique_ptr<Worker>> workers_;
   std::vector<std::unique_ptr<WorkerDirectory>> worker_directories_;
+  std::vector<DataExchangerAsync> data_exchangers_;
+  std::vector<std::unique_ptr<StorageManager>> storage_managers_;
   std::vector<std::unique_ptr<Shiftboss>> shiftbosses_;
 
   DISALLOW_COPY_AND_ASSIGN(DistributedExecutionGeneratorTestRunner);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/TestDatabaseLoader.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/TestDatabaseLoader.hpp b/query_optimizer/tests/TestDatabaseLoader.hpp
index d49719d..87c19c6 100644
--- a/query_optimizer/tests/TestDatabaseLoader.hpp
+++ b/query_optimizer/tests/TestDatabaseLoader.hpp
@@ -24,12 +24,21 @@
 
 #include "catalog/CatalogDatabase.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
+
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "storage/StorageBlockInfo.hpp"
+#endif  // QUICKSTEP_DISTRIBUTED
+
 #include "storage/StorageManager.hpp"
 #include "threading/ThreadIDBasedMap.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
 
+#ifdef QUICKSTEP_DISTRIBUTED
+namespace tmb { class MessageBus; }
+#endif  // QUICKSTEP_DISTRIBUTED
+
 namespace quickstep {
 
 class CatalogRelation;
@@ -60,18 +69,34 @@ class TestDatabaseLoader {
                           0 /* id */),
         storage_manager_(storage_path),
         test_relation_(nullptr) {
-    bus_.Initialize();
-
-    const tmb::client_id worker_thread_client_id = bus_.Connect();
-    bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage);
-
-    // Refer to InsertDestination::sendBlockFilledMessage for the rationale
-    // behind using ClientIDMap.
-    thread_id_map_->addValue(worker_thread_client_id);
+    init();
+  }
 
-    scheduler_client_id_ = bus_.Connect();
-    bus_.RegisterClientAsReceiver(scheduler_client_id_, kCatalogRelationNewBlockMessage);
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Constructor for the distributed version.
+   *
+   * @param storage_path A filesystem directory where the blocks may be
+   *                     evicted to during the execution of a test query.
+   *                     Can be empty if the test query is not executed
+   *                     in the query engine.
+   * @param block_domain The block_domain for StorageManager.
+   * @param locator_client_id The client id of BlockLocator for StorageManager.
+   * @param bus_global The Bus for StorageManager.
+   */
+  TestDatabaseLoader(const std::string &storage_path,
+                     const block_id_domain block_domain,
+                     const tmb::client_id locator_client_id,
+                     tmb::MessageBus *bus_global)
+      : thread_id_map_(ClientIDMap::Instance()),
+        catalog_database_(nullptr /* parent */,
+                          "TestDatabase" /* name */,
+                          0 /* id */),
+        storage_manager_(storage_path, block_domain, locator_client_id, bus_global),
+        test_relation_(nullptr) {
+    init();
   }
+#endif  // QUICKSTEP_DISTRIBUTED
 
   ~TestDatabaseLoader() {
     clear();
@@ -139,6 +164,20 @@ class TestDatabaseLoader {
   void clear();
 
  private:
+  void init() {
+    bus_.Initialize();
+
+    const tmb::client_id worker_thread_client_id = bus_.Connect();
+    bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage);
+
+    // Refer to InsertDestination::sendBlockFilledMessage for the rationale
+    // behind using ClientIDMap.
+    thread_id_map_->addValue(worker_thread_client_id);
+
+    scheduler_client_id_ = bus_.Connect();
+    bus_.RegisterClientAsReceiver(scheduler_client_id_, kCatalogRelationNewBlockMessage);
+  }
+
   /**
    * @brief Simulate Foreman to add all new blocks to the relation.
    */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/storage/DataExchangerAsync.cpp
----------------------------------------------------------------------
diff --git a/storage/DataExchangerAsync.cpp b/storage/DataExchangerAsync.cpp
index 59f5ebf..1d2f7db 100644
--- a/storage/DataExchangerAsync.cpp
+++ b/storage/DataExchangerAsync.cpp
@@ -155,11 +155,11 @@ void DataExchangerAsync::run() {
       if (ok) {
         call_context->Proceed();
       } else {
-        LOG(WARNING) << "Not ok\n";
+        LOG(WARNING) << "DataExchangerAsync " << server_address_ << " is not ok";
         delete call_context;
       }
     } else {
-      LOG(INFO) << "Shutdown\n";
+      LOG(INFO) << "DataExchangerAsync " << server_address_ << " shuts down";
       return;
     }
   }