You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:30 UTC

[38/50] incubator-quickstep git commit: Transfered the ownership of QueryHandle to QueryManager.

Transfered the ownership of QueryHandle to QueryManager.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c608e99e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c608e99e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c608e99e

Branch: refs/heads/quickstep_partition_parser_support
Commit: c608e99eff883afacafa2f0fefce99a0513f9963
Parents: 57730e4
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 20 12:04:24 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 27 18:00:34 2016 -0800

----------------------------------------------------------------------
 cli/CommandExecutor.cpp                         | 30 +++++++------
 cli/QuickstepCli.cpp                            | 46 ++++++++++----------
 cli/tests/CommandExecutorTestRunner.cpp         | 31 +++++++------
 query_execution/QueryManagerBase.hpp            |  7 ++-
 .../tests/QueryManagerSingleNode_unittest.cpp   |  8 ++--
 .../DistributedExecutionGeneratorTestRunner.cpp | 34 ++++++++-------
 .../tests/ExecutionGeneratorTestRunner.cpp      | 35 ++++++++-------
 7 files changed, 99 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 4ab32de..3c510e7 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -213,25 +213,27 @@ inline std::vector<TypedValue> executeQueryForSingleRow(
   DCHECK(result.condition == ParseResult::kSuccess);
 
   const ParseStatement &statement = *result.parsed_statement;
+  const CatalogRelation *query_result_relation = nullptr;
 
-  // Generate the query plan.
-  std::unique_ptr<QueryHandle> query_handle(
-      std::make_unique<QueryHandle>(query_processor->query_id(),
-                                    main_thread_client_id,
-                                    statement.getPriority()));
-  query_processor->generateQueryHandle(statement, query_handle.get());
-  DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-
-  // Use foreman to execute the query plan.
-  QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-      main_thread_client_id, foreman_client_id, query_handle.get(), bus);
+  {
+    // Generate the query plan.
+    auto query_handle =
+        std::make_unique<QueryHandle>(query_processor->query_id(),
+                                      main_thread_client_id,
+                                      statement.getPriority());
+    query_processor->generateQueryHandle(statement, query_handle.get());
+    DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+    query_result_relation = query_handle->getQueryResultRelation();
+    DCHECK(query_result_relation != nullptr);
+
+    // Use foreman to execute the query plan.
+    QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+        main_thread_client_id, foreman_client_id, query_handle.release(), bus);
+  }
 
   QueryExecutionUtil::ReceiveQueryCompletionMessage(main_thread_client_id, bus);
 
   // Retrieve the scalar result from the result relation.
-  const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
-  DCHECK(query_result_relation != nullptr);
-
   std::vector<TypedValue> values;
   {
     std::vector<block_id> blocks = query_result_relation->getBlocksSnapshot();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index f4816a8..9db7577 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -341,38 +341,41 @@ int main(int argc, char* argv[]) {
           continue;
         }
 
-        std::unique_ptr<QueryHandle> query_handle(
-            std::make_unique<QueryHandle>(query_processor->query_id(),
-                                          main_thread_client_id,
-                                          statement.getPriority()));
+        const std::size_t query_id = query_processor->query_id();
+        const CatalogRelation *query_result_relation = nullptr;
+        std::unique_ptr<quickstep::ExecutionDAGVisualizer> dag_visualizer;
+
         try {
+          auto query_handle = std::make_unique<QueryHandle>(query_id,
+                                                            main_thread_client_id,
+                                                            statement.getPriority());
           query_processor->generateQueryHandle(statement, query_handle.get());
+          DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+          if (quickstep::FLAGS_visualize_execution_dag) {
+            dag_visualizer =
+                std::make_unique<quickstep::ExecutionDAGVisualizer>(*query_handle->getQueryPlanMutable());
+          }
+
+          query_result_relation = query_handle->getQueryResultRelation();
+
+          start = std::chrono::steady_clock::now();
+          QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+              main_thread_client_id,
+              foreman.getBusClientID(),
+              query_handle.release(),
+              &bus);
         } catch (const quickstep::SqlError &sql_error) {
           fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
           reset_parser = true;
           break;
         }
 
-        DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-        std::unique_ptr<quickstep::ExecutionDAGVisualizer> dag_visualizer;
-        if (quickstep::FLAGS_visualize_execution_dag) {
-          dag_visualizer.reset(
-              new quickstep::ExecutionDAGVisualizer(*query_handle->getQueryPlanMutable()));
-        }
-
-        start = std::chrono::steady_clock::now();
-        QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-            main_thread_client_id,
-            foreman.getBusClientID(),
-            query_handle.get(),
-            &bus);
-
         try {
           QueryExecutionUtil::ReceiveQueryCompletionMessage(
               main_thread_client_id, &bus);
           end = std::chrono::steady_clock::now();
 
-          const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
           if (query_result_relation) {
             PrintToScreen::PrintRelation(*query_result_relation,
                                          &storage_manager,
@@ -394,12 +397,11 @@ int main(int argc, char* argv[]) {
                      time_ms.count(), 3).c_str());
           if (quickstep::FLAGS_profile_and_report_workorder_perf) {
             // TODO(harshad) - Allow user specified file instead of stdout.
-            foreman.printWorkOrderProfilingResults(query_handle->query_id(),
-                                                   stdout);
+            foreman.printWorkOrderProfilingResults(query_id, stdout);
           }
           if (quickstep::FLAGS_visualize_execution_dag) {
             const auto &profiling_stats =
-                foreman.getWorkOrderProfilingResults(query_handle->query_id());
+                foreman.getWorkOrderProfilingResults(query_id);
             dag_visualizer->bindProfilingStats(profiling_stats);
             std::cerr << "\n" << dag_visualizer->toDOT() << "\n";
           }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp
index 41cc9da..e352b24 100644
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@ -20,6 +20,7 @@
 #include "cli/tests/CommandExecutorTestRunner.hpp"
 
 #include <cstdio>
+#include <memory>
 #include <set>
 #include <string>
 #include <utility>
@@ -88,28 +89,26 @@ void CommandExecutorTestRunner::runTestCase(
               nullptr,
               output_stream.file());
         } else {
-          QueryHandle query_handle(0 /* query_id */, main_thread_client_id_);
-          O::OptimizerContext optimizer_context;
-
-          optimizer_.generateQueryHandle(parse_statement,
-                                         test_database_loader_.catalog_database(),
-                                         &optimizer_context,
-                                         &query_handle);
-
-          AdmitRequestMessage request_message(&query_handle);
-          TaggedMessage admit_tagged_message(
-              &request_message, sizeof(request_message), kAdmitRequestMessage);
-          QueryExecutionUtil::SendTMBMessage(&bus_,
-                                             main_thread_client_id_,
-                                             foreman_->getBusClientID(),
-                                             std::move(admit_tagged_message));
+          const CatalogRelation *query_result_relation = nullptr;
+          {
+            auto query_handle = std::make_unique<QueryHandle>(0 /* query_id */, main_thread_client_id_);
+            O::OptimizerContext optimizer_context;
+
+            optimizer_.generateQueryHandle(parse_statement,
+                                           test_database_loader_.catalog_database(),
+                                           &optimizer_context,
+                                           query_handle.get());
+            query_result_relation = query_handle->getQueryResultRelation();
+
+            QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+                main_thread_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+          }
 
           // Receive workload completion message from Foreman.
           const AnnotatedMessage annotated_msg =
               bus_.Receive(main_thread_client_id_, 0, true);
           const TaggedMessage &tagged_message = annotated_msg.tagged_message;
           DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
-          const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
           if (query_result_relation) {
             PrintToScreen::PrintRelation(*query_result_relation,
                                          test_database_loader_.storage_manager(),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index a274742..ddb76d5 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -26,6 +26,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryExecutionState.hpp"
+#include "query_optimizer/QueryHandle.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -34,8 +35,6 @@
 
 namespace quickstep {
 
-class QueryHandle;
-
 /** \addtogroup QueryExecution
  *  @{
  */
@@ -77,7 +76,7 @@ class QueryManagerBase {
    * @brief Get the query handle.
    **/
   const QueryHandle* query_handle() const {
-    return query_handle_;
+    return query_handle_.get();
   }
 
   /**
@@ -259,7 +258,7 @@ class QueryManagerBase {
     return query_exec_state_->hasRebuildInitiated(index);
   }
 
-  const QueryHandle *query_handle_;
+  std::unique_ptr<QueryHandle> query_handle_;
 
   const std::size_t query_id_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index f65ec53..6ec6521 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -234,14 +234,14 @@ class QueryManagerTest : public ::testing::Test {
     db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
     storage_manager_.reset(new StorageManager("./"));
     bus_.Initialize();
-    query_handle_.reset(new QueryHandle(0 /* dummy query ID */, tmb::kClientIdNone /* cli_id */));
+    query_handle_ = new QueryHandle(0 /* dummy query ID */, tmb::kClientIdNone /* cli_id */);
     query_plan_ = query_handle_->getQueryPlanMutable();
     query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
   }
 
   inline void constructQueryManager() {
     query_manager_.reset(new QueryManagerSingleNode(
-        0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_));
+        0, 1, query_handle_, db_.get(), storage_manager_.get(), &bus_));
   }
 
   inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
@@ -291,8 +291,8 @@ class QueryManagerTest : public ::testing::Test {
   unique_ptr<CatalogDatabase> db_;
   unique_ptr<StorageManager> storage_manager_;
 
-  QueryPlan *query_plan_;
-  unique_ptr<QueryHandle> query_handle_;
+  QueryPlan *query_plan_;  // Owned by 'query_handle_'.
+  QueryHandle* query_handle_;  // Owned by 'query_manager_'.
   unique_ptr<QueryManagerSingleNode> query_manager_;
 
   MessageBusImpl bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 2351dcd..5100651 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -160,37 +160,39 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
 
     const ParseStatement &parse_statement = *result.parsed_statement;
     std::printf("%s\n", parse_statement.toString().c_str());
+
+    const CatalogRelation *query_result_relation = nullptr;
     try {
       OptimizerContext optimizer_context;
-      QueryHandle query_handle(query_id_++, cli_id_);
+      auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
 
       optimizer_.generateQueryHandle(parse_statement,
                                      test_database_loader_->catalog_database(),
                                      &optimizer_context,
-                                     &query_handle);
+                                     query_handle.get());
+      query_result_relation = query_handle->getQueryResultRelation();
 
       QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
           cli_id_,
           foreman_->getBusClientID(),
-          &query_handle,
+          query_handle.release(),
           &bus_);
-
-      const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
-      DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
-
-      const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
-      if (query_result_relation) {
-          PrintToScreen::PrintRelation(*query_result_relation,
-                                       test_database_loader_->storage_manager(),
-                                       output_stream.file());
-          DropRelation::Drop(*query_result_relation,
-                             test_database_loader_->catalog_database(),
-                             test_database_loader_->storage_manager());
-      }
     } catch (const SqlError &error) {
       *output = error.formatMessage(input);
       break;
     }
+
+    const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+    DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+    if (query_result_relation) {
+      PrintToScreen::PrintRelation(*query_result_relation,
+                                   test_database_loader_->storage_manager(),
+                                   output_stream.file());
+      DropRelation::Drop(*query_result_relation,
+                         test_database_loader_->catalog_database(),
+                         test_database_loader_->storage_manager());
+    }
   }
 
   if (output->empty()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 06397d4..ee9bee7 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -20,6 +20,7 @@
 #include "query_optimizer/tests/ExecutionGeneratorTestRunner.hpp"
 
 #include <cstdio>
+#include <memory>
 #include <set>
 #include <string>
 
@@ -71,37 +72,39 @@ void ExecutionGeneratorTestRunner::runTestCase(
     } else {
       const ParseStatement &parse_statement = *result.parsed_statement;
       std::printf("%s\n", parse_statement.toString().c_str());
+
+      const CatalogRelation *query_result_relation = nullptr;
       try {
-        QueryHandle query_handle(0 /* query_id */, main_thread_client_id_);
         OptimizerContext optimizer_context;
+        auto query_handle = std::make_unique<QueryHandle>(0 /* query_id */, main_thread_client_id_);
 
         optimizer_.generateQueryHandle(parse_statement,
                                        test_database_loader_.catalog_database(),
                                        &optimizer_context,
-                                       &query_handle);
+                                       query_handle.get());
+        query_result_relation = query_handle->getQueryResultRelation();
 
         QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
             main_thread_client_id_,
             foreman_->getBusClientID(),
-            &query_handle,
+            query_handle.release(),
             &bus_);
-
-        QueryExecutionUtil::ReceiveQueryCompletionMessage(
-            main_thread_client_id_, &bus_);
-
-        const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
-        if (query_result_relation) {
-            PrintToScreen::PrintRelation(*query_result_relation,
-                                         test_database_loader_.storage_manager(),
-                                         output_stream.file());
-            DropRelation::Drop(*query_result_relation,
-                               test_database_loader_.catalog_database(),
-                               test_database_loader_.storage_manager());
-        }
       } catch (const SqlError &error) {
         *output = error.formatMessage(input);
         break;
       }
+
+      QueryExecutionUtil::ReceiveQueryCompletionMessage(
+          main_thread_client_id_, &bus_);
+
+      if (query_result_relation) {
+        PrintToScreen::PrintRelation(*query_result_relation,
+                                     test_database_loader_.storage_manager(),
+                                     output_stream.file());
+        DropRelation::Drop(*query_result_relation,
+                           test_database_loader_.catalog_database(),
+                           test_database_loader_.storage_manager());
+      }
     }
   }