You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/13 09:19:07 UTC
[1/3] incubator-quickstep git commit: Minor refactored
AdmitRequestMessage constructor. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/dist-analyze-refactor a16a7f71b -> 6939cadc4 (forced update)
Minor refactored AdmitRequestMessage constructor.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/70e76637
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/70e76637
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/70e76637
Branch: refs/heads/dist-analyze-refactor
Commit: 70e76637f0f0b90c3e1fc9db31a70a195e2609d1
Parents: 14fcd61
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 02:00:37 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 02:00:37 2017 -0700
----------------------------------------------------------------------
query_execution/AdmitRequestMessage.hpp | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/70e76637/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp
index dd3d49b..d93fd91 100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@ -51,9 +51,8 @@ class AdmitRequestMessage {
*
* @param query_handle The handle of the query requesting to be admitted.
**/
- explicit AdmitRequestMessage(QueryHandle *query_handle) {
- query_handles_.push_back(query_handle);
- }
+ explicit AdmitRequestMessage(QueryHandle *query_handle)
+ : query_handles_({ query_handle }) {}
/**
* @brief Get the query handles from this message.
@@ -63,7 +62,7 @@ class AdmitRequestMessage {
}
private:
- std::vector<QueryHandle*> query_handles_;
+ const std::vector<QueryHandle*> query_handles_;
DISALLOW_COPY_AND_ASSIGN(AdmitRequestMessage);
};
[3/3] incubator-quickstep git commit: Refactored '\analyze' in the
distributed version.
Posted by zu...@apache.org.
Refactored '\analyze' in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6939cadc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6939cadc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6939cadc
Branch: refs/heads/dist-analyze-refactor
Commit: 6939cadc44cb321f796bf977d1601a8dfc363d5e
Parents: d383591
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 01:25:29 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 02:18:47 2017 -0700
----------------------------------------------------------------------
cli/distributed/Conductor.cpp | 78 +++++++++++++++++++++++++-------------
cli/distributed/Conductor.hpp | 3 --
2 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6939cadc/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index ef253f1..6f593f4 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -210,11 +210,12 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
query_processor_->generateQueryHandle(statement, query_handle.get());
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- conductor_client_id_,
- foreman_->getBusClientID(),
- query_handle.release(),
- &bus_);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ conductor_client_id_,
+ foreman_->getBusClientID(),
+ query_handle.release(),
+ &bus_));
}
} catch (const SqlError &sql_error) {
// Set the query execution status along with the error message.
@@ -252,11 +253,16 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
}
}
+ SqlParserWrapper parser_wrapper;
+ std::vector<QueryHandle*> query_handles;
+
// Analyze each relation in the database.
for (const CatalogRelation &relation : relations) {
const relation_id rel_id = relation.getID();
const string rel_name = EscapeQuotes(relation.getName(), '"');
+ string *query_string = nullptr;
+
// Get the number of distinct values for each column.
for (const CatalogAttribute &attribute : relation) {
const string attr_name = EscapeQuotes(attribute.getName(), '"');
@@ -269,7 +275,7 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
// NOTE(jianqiao): Note that the relation name and the attribute names may
// contain non-letter characters, e.g. CREATE TABLE "with space"("1" int).
// So here we need to format the names with double quotes (").
- string *query_string = new string("SELECT COUNT(DISTINCT \"");
+ query_string = new string("SELECT COUNT(DISTINCT \"");
query_string->append(attr_name);
query_string->append("\")");
if (is_min_applicable) {
@@ -286,37 +292,55 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
query_string->append(rel_name);
query_string->append("\";");
- submitQuery(sender, query_string,
- new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(),
- attribute.getID(), is_min_applicable, is_max_applicable));
+ parser_wrapper.feedNextBuffer(query_string);
+ const ParseResult parse_result = parser_wrapper.getNextStatement();
+ DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+
+ const ParseStatement &statement = *parse_result.parsed_statement;
+
+ // Generate the query plan.
+ auto query_handle =
+ make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
+ new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id,
+ relations.size(), attribute.getID(),
+ is_min_applicable, is_max_applicable));
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ query_handles.push_back(query_handle.release());
}
// Get the number of tuples for the relation.
- string *query_string = new string("SELECT COUNT(*) FROM \"");
+ query_string = new string("SELECT COUNT(*) FROM \"");
query_string->append(rel_name);
query_string->append("\";");
- submitQuery(sender, query_string,
- new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size()));
- }
-}
+ parser_wrapper.feedNextBuffer(query_string);
+ const ParseResult parse_result = parser_wrapper.getNextStatement();
+ DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
-void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) {
- SqlParserWrapper parser_wrapper;
- parser_wrapper.feedNextBuffer(query);
- ParseResult parse_result = parser_wrapper.getNextStatement();
- DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+ const ParseStatement &statement = *parse_result.parsed_statement;
- const ParseStatement &statement = *parse_result.parsed_statement;
+ // Generate the query plan.
+ auto query_handle =
+ make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(),
+ new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id,
+ relations.size()));
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- // Generate the query plan.
- auto query_handle =
- make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info);
- query_processor_->generateQueryHandle(statement, query_handle.get());
- DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ query_handles.push_back(query_handle.release());
+ }
+
+ if (!query_handles.empty()) {
+ auto request_message = make_unique<AdmitRequestMessage>(query_handles);
+ const std::size_t size_of_request_msg = sizeof(*request_message);
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, foreman_->getBusClientID(),
+ TaggedMessage(request_message.release(), size_of_request_msg,
+ kAdmitRequestMessage)));
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6939cadc/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 28c8e02..12c5b52 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -26,7 +26,6 @@
#include "cli/distributed/Role.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
-#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "utility/Macros.hpp"
#include "utility/PtrVector.hpp"
@@ -66,8 +65,6 @@ class Conductor final : public Role {
void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments);
- void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info);
-
std::unique_ptr<QueryProcessor> query_processor_;
// Not owned.
CatalogDatabase *catalog_database_;
[2/3] incubator-quickstep git commit: Minor refactored the query
admission process in the distributed version.
Posted by zu...@apache.org.
Minor refactored the query admission process in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d383591b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d383591b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d383591b
Branch: refs/heads/dist-analyze-refactor
Commit: d383591b13775c74148ca3ff31ce882b283bccf7
Parents: 70e7663
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 02:10:58 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 02:10:58 2017 -0700
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 13 +------------
query_execution/PolicyEnforcerBase.cpp | 2 ++
2 files changed, 3 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d383591b/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 06fb5a1..d235170 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -165,19 +165,8 @@ void ForemanDistributed::run() {
const AdmitRequestMessage *request_message =
static_cast<const AdmitRequestMessage*>(tagged_message.message());
- const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
- DCHECK(!query_handles.empty());
-
- bool all_queries_admitted = true;
- if (query_handles.size() == 1u) {
- all_queries_admitted =
- policy_enforcer_->admitQuery(query_handles.front());
- } else {
- all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
- }
- if (!all_queries_admitted) {
+ if (!policy_enforcer_->admitQueries(request_message->getQueryHandles())) {
LOG(WARNING) << "The scheduler could not admit all the queries";
- // TODO(harshad) - Inform the main thread about the failure.
}
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d383591b/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 324bcb3..32f29a3 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -157,6 +157,8 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
bool PolicyEnforcerBase::admitQueries(
const std::vector<QueryHandle*> &query_handles) {
+ DCHECK(!query_handles.empty());
+
bool all_queries_admitted = true;
for (QueryHandle *curr_query : query_handles) {
if (all_queries_admitted) {