You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/13 09:19:07 UTC

[1/3] incubator-quickstep git commit: Minor refactored AdmitRequestMessage constructor. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/dist-analyze-refactor a16a7f71b -> 6939cadc4 (forced update)


Minor refactored AdmitRequestMessage constructor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/70e76637
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/70e76637
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/70e76637

Branch: refs/heads/dist-analyze-refactor
Commit: 70e76637f0f0b90c3e1fc9db31a70a195e2609d1
Parents: 14fcd61
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 02:00:37 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 02:00:37 2017 -0700

----------------------------------------------------------------------
 query_execution/AdmitRequestMessage.hpp | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/70e76637/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp
index dd3d49b..d93fd91 100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@ -51,9 +51,8 @@ class AdmitRequestMessage {
    *
    * @param query_handle The handle of the query requesting to be admitted.
    **/
-  explicit AdmitRequestMessage(QueryHandle *query_handle) {
-    query_handles_.push_back(query_handle);
-  }
+  explicit AdmitRequestMessage(QueryHandle *query_handle)
+      : query_handles_({ query_handle }) {}
 
   /**
    * @brief Get the query handles from this message.
@@ -63,7 +62,7 @@ class AdmitRequestMessage {
   }
 
  private:
-  std::vector<QueryHandle*> query_handles_;
+  const std::vector<QueryHandle*> query_handles_;
 
   DISALLOW_COPY_AND_ASSIGN(AdmitRequestMessage);
 };


[3/3] incubator-quickstep git commit: Refactored '\analyze' in the distributed version.

Posted by zu...@apache.org.
Refactored '\analyze' in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6939cadc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6939cadc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6939cadc

Branch: refs/heads/dist-analyze-refactor
Commit: 6939cadc44cb321f796bf977d1601a8dfc363d5e
Parents: d383591
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 01:25:29 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 02:18:47 2017 -0700

----------------------------------------------------------------------
 cli/distributed/Conductor.cpp | 78 +++++++++++++++++++++++++-------------
 cli/distributed/Conductor.hpp |  3 --
 2 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6939cadc/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index ef253f1..6f593f4 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -210,11 +210,12 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
       query_processor_->generateQueryHandle(statement, query_handle.get());
       DCHECK(query_handle->getQueryPlanMutable() != nullptr);
 
-      QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-          conductor_client_id_,
-          foreman_->getBusClientID(),
-          query_handle.release(),
-          &bus_);
+      CHECK(MessageBus::SendStatus::kOK ==
+          QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+              conductor_client_id_,
+              foreman_->getBusClientID(),
+              query_handle.release(),
+              &bus_));
     }
   } catch (const SqlError &sql_error) {
     // Set the query execution status along with the error message.
@@ -252,11 +253,16 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
     }
   }
 
+  SqlParserWrapper parser_wrapper;
+  std::vector<QueryHandle*> query_handles;
+
   // Analyze each relation in the database.
   for (const CatalogRelation &relation : relations) {
     const relation_id rel_id = relation.getID();
     const string rel_name = EscapeQuotes(relation.getName(), '"');
 
+    string *query_string = nullptr;
+
     // Get the number of distinct values for each column.
     for (const CatalogAttribute &attribute : relation) {
       const string attr_name = EscapeQuotes(attribute.getName(), '"');
@@ -269,7 +275,7 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
       // NOTE(jianqiao): Note that the relation name and the attribute names may
       // contain non-letter characters, e.g. CREATE TABLE "with space"("1" int).
       // So here we need to format the names with double quotes (").
-      string *query_string = new string("SELECT COUNT(DISTINCT \"");
+      query_string = new string("SELECT COUNT(DISTINCT \"");
       query_string->append(attr_name);
       query_string->append("\")");
       if (is_min_applicable) {
@@ -286,37 +292,55 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
       query_string->append(rel_name);
       query_string->append("\";");
 
-      submitQuery(sender, query_string,
-                  new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(),
-                                                    attribute.getID(), is_min_applicable, is_max_applicable));
+      parser_wrapper.feedNextBuffer(query_string);
+      const ParseResult parse_result = parser_wrapper.getNextStatement();
+      DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+
+      const ParseStatement &statement = *parse_result.parsed_statement;
+
+      // Generate the query plan.
+      auto query_handle =
+          make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
+                                   new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id,
+                                                                     relations.size(), attribute.getID(),
+                                                                     is_min_applicable, is_max_applicable));
+      query_processor_->generateQueryHandle(statement, query_handle.get());
+      DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+      query_handles.push_back(query_handle.release());
     }
 
     // Get the number of tuples for the relation.
-    string *query_string = new string("SELECT COUNT(*) FROM \"");
+    query_string = new string("SELECT COUNT(*) FROM \"");
     query_string->append(rel_name);
     query_string->append("\";");
 
-    submitQuery(sender, query_string,
-                new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size()));
-  }
-}
+    parser_wrapper.feedNextBuffer(query_string);
+    const ParseResult parse_result = parser_wrapper.getNextStatement();
+    DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
 
-void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) {
-  SqlParserWrapper parser_wrapper;
-  parser_wrapper.feedNextBuffer(query);
-  ParseResult parse_result = parser_wrapper.getNextStatement();
-  DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+    const ParseStatement &statement = *parse_result.parsed_statement;
 
-  const ParseStatement &statement = *parse_result.parsed_statement;
+    // Generate the query plan.
+    auto query_handle =
+        make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
+                                 new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id,
+                                                                   relations.size()));
+    query_processor_->generateQueryHandle(statement, query_handle.get());
+    DCHECK(query_handle->getQueryPlanMutable() != nullptr);
 
-  // Generate the query plan.
-  auto query_handle =
-      make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info);
-  query_processor_->generateQueryHandle(statement, query_handle.get());
-  DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+    query_handles.push_back(query_handle.release());
+  }
+
+  if (!query_handles.empty()) {
+    auto request_message = make_unique<AdmitRequestMessage>(query_handles);
+    const std::size_t size_of_request_msg = sizeof(*request_message);
 
-  QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-      conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, foreman_->getBusClientID(),
+                                           TaggedMessage(request_message.release(), size_of_request_msg,
+                                                         kAdmitRequestMessage)));
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6939cadc/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 28c8e02..12c5b52 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -26,7 +26,6 @@
 #include "cli/distributed/Role.hpp"
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
-#include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
@@ -66,8 +65,6 @@ class Conductor final : public Role {
 
   void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments);
 
-  void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info);
-
   std::unique_ptr<QueryProcessor> query_processor_;
   // Not owned.
   CatalogDatabase *catalog_database_;


[2/3] incubator-quickstep git commit: Minor refactored the query admission process in the distributed version.

Posted by zu...@apache.org.
Minor refactored the query admission process in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d383591b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d383591b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d383591b

Branch: refs/heads/dist-analyze-refactor
Commit: d383591b13775c74148ca3ff31ce882b283bccf7
Parents: 70e7663
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 02:10:58 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 02:10:58 2017 -0700

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp | 13 +------------
 query_execution/PolicyEnforcerBase.cpp |  2 ++
 2 files changed, 3 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d383591b/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 06fb5a1..d235170 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -165,19 +165,8 @@ void ForemanDistributed::run() {
         const AdmitRequestMessage *request_message =
             static_cast<const AdmitRequestMessage*>(tagged_message.message());
 
-        const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
-        DCHECK(!query_handles.empty());
-
-        bool all_queries_admitted = true;
-        if (query_handles.size() == 1u) {
-          all_queries_admitted =
-              policy_enforcer_->admitQuery(query_handles.front());
-        } else {
-          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
-        }
-        if (!all_queries_admitted) {
+        if (!policy_enforcer_->admitQueries(request_message->getQueryHandles())) {
           LOG(WARNING) << "The scheduler could not admit all the queries";
-          // TODO(harshad) - Inform the main thread about the failure.
         }
         break;
       }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d383591b/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 324bcb3..32f29a3 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -157,6 +157,8 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
 
 bool PolicyEnforcerBase::admitQueries(
     const std::vector<QueryHandle*> &query_handles) {
+  DCHECK(!query_handles.empty());
+
   bool all_queries_admitted = true;
   for (QueryHandle *curr_query : query_handles) {
     if (all_queries_admitted) {