You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/03/15 19:01:17 UTC

[58/62] [abbrv] incubator-quickstep git commit: Refactored '\analyze' in the distributed version.

Refactored '\analyze' in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6ec765cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6ec765cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6ec765cb

Branch: refs/heads/LIP-time-decomposition
Commit: 6ec765cb037230210f1a358dda47b85dd1305fe1
Parents: d383591
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 01:25:29 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 02:23:32 2017 -0700

----------------------------------------------------------------------
 cli/distributed/Conductor.cpp | 75 ++++++++++++++++++++++++--------------
 cli/distributed/Conductor.hpp |  3 --
 2 files changed, 48 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6ec765cb/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index ef253f1..8b0ba03 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -210,11 +210,9 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
       query_processor_->generateQueryHandle(statement, query_handle.get());
       DCHECK(query_handle->getQueryPlanMutable() != nullptr);
 
-      QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-          conductor_client_id_,
-          foreman_->getBusClientID(),
-          query_handle.release(),
-          &bus_);
+      CHECK(MessageBus::SendStatus::kOK ==
+          QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+              conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_));
     }
   } catch (const SqlError &sql_error) {
     // Set the query execution status along with the error message.
@@ -252,11 +250,16 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
     }
   }
 
+  SqlParserWrapper parser_wrapper;
+  std::vector<QueryHandle*> query_handles;
+
   // Analyze each relation in the database.
   for (const CatalogRelation &relation : relations) {
     const relation_id rel_id = relation.getID();
     const string rel_name = EscapeQuotes(relation.getName(), '"');
 
+    string *query_string = nullptr;
+
     // Get the number of distinct values for each column.
     for (const CatalogAttribute &attribute : relation) {
       const string attr_name = EscapeQuotes(attribute.getName(), '"');
@@ -269,7 +272,7 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
       // NOTE(jianqiao): Note that the relation name and the attribute names may
       // contain non-letter characters, e.g. CREATE TABLE "with space"("1" int).
       // So here we need to format the names with double quotes (").
-      string *query_string = new string("SELECT COUNT(DISTINCT \"");
+      query_string = new string("SELECT COUNT(DISTINCT \"");
       query_string->append(attr_name);
       query_string->append("\")");
       if (is_min_applicable) {
@@ -286,37 +289,55 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
       query_string->append(rel_name);
       query_string->append("\";");
 
-      submitQuery(sender, query_string,
-                  new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(),
-                                                    attribute.getID(), is_min_applicable, is_max_applicable));
+      parser_wrapper.feedNextBuffer(query_string);
+      const ParseResult parse_result = parser_wrapper.getNextStatement();
+      DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+
+      const ParseStatement &statement = *parse_result.parsed_statement;
+
+      // Generate the query plan.
+      auto query_handle =
+          make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
+                                   new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id,
+                                                                     relations.size(), attribute.getID(),
+                                                                     is_min_applicable, is_max_applicable));
+      query_processor_->generateQueryHandle(statement, query_handle.get());
+      DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+      query_handles.push_back(query_handle.release());
     }
 
     // Get the number of tuples for the relation.
-    string *query_string = new string("SELECT COUNT(*) FROM \"");
+    query_string = new string("SELECT COUNT(*) FROM \"");
     query_string->append(rel_name);
     query_string->append("\";");
 
-    submitQuery(sender, query_string,
-                new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size()));
-  }
-}
+    parser_wrapper.feedNextBuffer(query_string);
+    const ParseResult parse_result = parser_wrapper.getNextStatement();
+    DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
 
-void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) {
-  SqlParserWrapper parser_wrapper;
-  parser_wrapper.feedNextBuffer(query);
-  ParseResult parse_result = parser_wrapper.getNextStatement();
-  DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+    const ParseStatement &statement = *parse_result.parsed_statement;
 
-  const ParseStatement &statement = *parse_result.parsed_statement;
+    // Generate the query plan.
+    auto query_handle =
+        make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
+                                 new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id,
+                                                                   relations.size()));
+    query_processor_->generateQueryHandle(statement, query_handle.get());
+    DCHECK(query_handle->getQueryPlanMutable() != nullptr);
 
-  // Generate the query plan.
-  auto query_handle =
-      make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info);
-  query_processor_->generateQueryHandle(statement, query_handle.get());
-  DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+    query_handles.push_back(query_handle.release());
+  }
+
+  if (!query_handles.empty()) {
+    auto request_message = make_unique<AdmitRequestMessage>(query_handles);
+    const std::size_t size_of_request_msg = sizeof(*request_message);
 
-  QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-      conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, foreman_->getBusClientID(),
+                                           TaggedMessage(request_message.release(), size_of_request_msg,
+                                                         kAdmitRequestMessage)));
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6ec765cb/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 28c8e02..12c5b52 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -26,7 +26,6 @@
 #include "cli/distributed/Role.hpp"
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
-#include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
@@ -66,8 +65,6 @@ class Conductor final : public Role {
 
   void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments);
 
-  void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info);
-
   std::unique_ptr<QueryProcessor> query_processor_;
   // Not owned.
   CatalogDatabase *catalog_database_;