You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/03/15 19:01:17 UTC
[58/62] [abbrv] incubator-quickstep git commit: Refactored '\analyze'
in the distributed version.
Refactored '\analyze' in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6ec765cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6ec765cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6ec765cb
Branch: refs/heads/LIP-time-decomposition
Commit: 6ec765cb037230210f1a358dda47b85dd1305fe1
Parents: d383591
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 01:25:29 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 02:23:32 2017 -0700
----------------------------------------------------------------------
cli/distributed/Conductor.cpp | 75 ++++++++++++++++++++++++--------------
cli/distributed/Conductor.hpp | 3 --
2 files changed, 48 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6ec765cb/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index ef253f1..8b0ba03 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -210,11 +210,9 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
query_processor_->generateQueryHandle(statement, query_handle.get());
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- conductor_client_id_,
- foreman_->getBusClientID(),
- query_handle.release(),
- &bus_);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_));
}
} catch (const SqlError &sql_error) {
// Set the query execution status along with the error message.
@@ -252,11 +250,16 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
}
}
+ SqlParserWrapper parser_wrapper;
+ std::vector<QueryHandle*> query_handles;
+
// Analyze each relation in the database.
for (const CatalogRelation &relation : relations) {
const relation_id rel_id = relation.getID();
const string rel_name = EscapeQuotes(relation.getName(), '"');
+ string *query_string = nullptr;
+
// Get the number of distinct values for each column.
for (const CatalogAttribute &attribute : relation) {
const string attr_name = EscapeQuotes(attribute.getName(), '"');
@@ -269,7 +272,7 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
// NOTE(jianqiao): Note that the relation name and the attribute names may
// contain non-letter characters, e.g. CREATE TABLE "with space"("1" int).
// So here we need to format the names with double quotes (").
- string *query_string = new string("SELECT COUNT(DISTINCT \"");
+ query_string = new string("SELECT COUNT(DISTINCT \"");
query_string->append(attr_name);
query_string->append("\")");
if (is_min_applicable) {
@@ -286,37 +289,55 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
query_string->append(rel_name);
query_string->append("\";");
- submitQuery(sender, query_string,
- new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(),
- attribute.getID(), is_min_applicable, is_max_applicable));
+ parser_wrapper.feedNextBuffer(query_string);
+ const ParseResult parse_result = parser_wrapper.getNextStatement();
+ DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+
+ const ParseStatement &statement = *parse_result.parsed_statement;
+
+ // Generate the query plan.
+ auto query_handle =
+ make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
+ new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id,
+ relations.size(), attribute.getID(),
+ is_min_applicable, is_max_applicable));
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ query_handles.push_back(query_handle.release());
}
// Get the number of tuples for the relation.
- string *query_string = new string("SELECT COUNT(*) FROM \"");
+ query_string = new string("SELECT COUNT(*) FROM \"");
query_string->append(rel_name);
query_string->append("\";");
- submitQuery(sender, query_string,
- new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size()));
- }
-}
+ parser_wrapper.feedNextBuffer(query_string);
+ const ParseResult parse_result = parser_wrapper.getNextStatement();
+ DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
-void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) {
- SqlParserWrapper parser_wrapper;
- parser_wrapper.feedNextBuffer(query);
- ParseResult parse_result = parser_wrapper.getNextStatement();
- DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+ const ParseStatement &statement = *parse_result.parsed_statement;
- const ParseStatement &statement = *parse_result.parsed_statement;
+ // Generate the query plan.
+ auto query_handle =
+ make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
+ new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id,
+ relations.size()));
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- // Generate the query plan.
- auto query_handle =
- make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info);
- query_processor_->generateQueryHandle(statement, query_handle.get());
- DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ query_handles.push_back(query_handle.release());
+ }
+
+ if (!query_handles.empty()) {
+ auto request_message = make_unique<AdmitRequestMessage>(query_handles);
+ const std::size_t size_of_request_msg = sizeof(*request_message);
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, foreman_->getBusClientID(),
+ TaggedMessage(request_message.release(), size_of_request_msg,
+ kAdmitRequestMessage)));
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6ec765cb/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 28c8e02..12c5b52 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -26,7 +26,6 @@
#include "cli/distributed/Role.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
-#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "utility/Macros.hpp"
#include "utility/PtrVector.hpp"
@@ -66,8 +65,6 @@ class Conductor final : public Role {
void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments);
- void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info);
-
std::unique_ptr<QueryProcessor> query_processor_;
// Not owned.
CatalogDatabase *catalog_database_;