You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/03 12:14:45 UTC
[1/8] incubator-quickstep git commit: Minor refactor regarding
CatalogDatabase. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/dist-patch 57ca76249 -> c5aa04127 (forced update)
Minor refactor regarding CatalogDatabase.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/964a8064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/964a8064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/964a8064
Branch: refs/heads/dist-patch
Commit: 964a80649f37502020c35a15401236f3f48719a2
Parents: 87bbb26
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 16:02:20 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 2 16:02:20 2017 -0800
----------------------------------------------------------------------
catalog/CatalogDatabase.hpp | 2 +-
catalog/CatalogDatabaseCache.hpp | 2 +-
query_execution/ForemanSingleNode.cpp | 3 +--
query_execution/ForemanSingleNode.hpp | 1 -
4 files changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/964a8064/catalog/CatalogDatabase.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogDatabase.hpp b/catalog/CatalogDatabase.hpp
index b071d09..86cfcba 100644
--- a/catalog/CatalogDatabase.hpp
+++ b/catalog/CatalogDatabase.hpp
@@ -138,7 +138,7 @@ class RelationIdNotFound : public std::exception {
/**
* @brief A single database in the catalog.
**/
-class CatalogDatabase : public CatalogDatabaseLite {
+class CatalogDatabase final : public CatalogDatabaseLite {
public:
typedef std::unordered_map<std::string, CatalogRelation*>::size_type size_type;
typedef PtrVector<CatalogRelation, true>::const_skip_iterator const_iterator;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/964a8064/catalog/CatalogDatabaseCache.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogDatabaseCache.hpp b/catalog/CatalogDatabaseCache.hpp
index 88ca61d..88e23ab 100644
--- a/catalog/CatalogDatabaseCache.hpp
+++ b/catalog/CatalogDatabaseCache.hpp
@@ -54,7 +54,7 @@ namespace serialization { class CatalogDatabase; }
* @note A CatalogRelationSchema should be kept unless all associated blocks
* have been deleted.
**/
-class CatalogDatabaseCache : public CatalogDatabaseLite {
+class CatalogDatabaseCache final : public CatalogDatabaseLite {
public:
/**
* @brief Constructor.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/964a8064/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 02799c7..3eac0ff 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -67,7 +67,6 @@ ForemanSingleNode::ForemanSingleNode(
: ForemanBase(bus, cpu_id),
main_thread_client_id_(main_thread_client_id),
worker_directory_(DCHECK_NOTNULL(worker_directory)),
- catalog_database_(DCHECK_NOTNULL(catalog_database)),
storage_manager_(DCHECK_NOTNULL(storage_manager)) {
const std::vector<QueryExecutionMessageType> sender_message_types{
kPoisonMessage,
@@ -95,7 +94,7 @@ ForemanSingleNode::ForemanSingleNode(
policy_enforcer_ = std::make_unique<PolicyEnforcerSingleNode>(
foreman_client_id_,
num_numa_nodes,
- catalog_database_,
+ catalog_database,
storage_manager_,
worker_directory_,
bus_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/964a8064/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
index d2db51b..9fa3b9a 100644
--- a/query_execution/ForemanSingleNode.hpp
+++ b/query_execution/ForemanSingleNode.hpp
@@ -111,7 +111,6 @@ class ForemanSingleNode final : public ForemanBase {
WorkerDirectory *worker_directory_;
- CatalogDatabaseLite *catalog_database_;
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);
[3/8] incubator-quickstep git commit: Refactored command execution in
the distributed version.
Posted by zu...@apache.org.
Refactored command execution in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7f25d1c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7f25d1c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7f25d1c1
Branch: refs/heads/dist-patch
Commit: 7f25d1c1473def1f6b5733bf55673f91d11d0195
Parents: ccb2852
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 21:30:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 2 21:30:28 2017 -0800
----------------------------------------------------------------------
cli/distributed/Cli.cpp | 54 +++------
cli/distributed/Conductor.cpp | 137 ++++++++--------------
cli/distributed/Conductor.hpp | 5 -
query_execution/QueryExecutionMessages.proto | 4 -
query_execution/QueryExecutionTypedefs.hpp | 4 +-
5 files changed, 67 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 49b7dc1..63f3259 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -123,7 +123,7 @@ void Cli::init() {
data_exchanger_.set_storage_manager(storage_manager_.get());
data_exchanger_.start();
- // Prepare for submitting a query.
+ // Prepare for submitting a query or a command.
bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
@@ -131,8 +131,6 @@ void Cli::init() {
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
- // Prepare for submitting a command.
- bus_.RegisterClientAsSender(cli_id_, kCommandMessage);
bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage);
}
@@ -166,50 +164,36 @@ void Cli::run() {
}
if (statement.getStatementType() == ParseStatement::kCommand) {
- const ParseCommand &command = static_cast<const ParseCommand &>(statement);
- const std::string &command_str = command.command()->value();
+ const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
+ const std::string &command = parse_command.command()->value();
try {
- if (command_str == C::kAnalyzeCommand) {
+ if (command == C::kAnalyzeCommand) {
// TODO(zuyu): support '\analyze'.
- THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command";
- } else if (command_str != C::kDescribeDatabaseCommand &&
- command_str != C::kDescribeTableCommand) {
- THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
+ THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command";
+ } else if (command != C::kDescribeDatabaseCommand &&
+ command != C::kDescribeTableCommand) {
+ THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command";
}
} catch (const SqlError &error) {
fprintf(stderr, "%s", error.formatMessage(*command_string).c_str());
reset_parser = true;
break;
}
+ }
- DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage
- << "') to Conductor";
- S::CommandMessage proto;
- proto.set_command(*command_string);
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length, kCommandMessage);
- free(proto_bytes);
-
- QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message));
- } else {
- DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
- << "') to Conductor";
- S::SqlQueryMessage proto;
- proto.set_sql_query(*command_string);
+ DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+ << "') to Conductor";
+ S::SqlQueryMessage proto;
+ proto.set_sql_query(*command_string);
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
- free(proto_bytes);
+ TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
+ free(proto_bytes);
- QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
- }
+ QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
start = std::chrono::steady_clock::now();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index b877b04..1b8bfb2 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -95,21 +95,18 @@ void Conductor::init() {
bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
- bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage);
- bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
-
bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
+ bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
+
bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage);
block_locator_ = make_unique<BlockLocator>(&bus_);
block_locator_->start();
- foreman_ = make_unique<ForemanDistributed>(*block_locator_,
- std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_,
- catalog_database_);
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, catalog_database_, query_processor_.get());
foreman_->start();
}
@@ -132,14 +129,6 @@ void Conductor::run() {
QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
break;
}
- case kCommandMessage: {
- S::CommandMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
- DLOG(INFO) << "Conductor received the following command: " << proto.command();
-
- processCommandMessage(sender, new string(move(proto.command())));
- break;
- }
case kSqlQueryMessage: {
S::SqlQueryMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -161,91 +150,59 @@ void Conductor::run() {
}
}
-void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string) {
- parser_wrapper_.feedNextBuffer(command_string);
- ParseResult parse_result = parser_wrapper_.getNextStatement();
-
+void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
+ SqlParserWrapper parser_wrapper;
+ parser_wrapper.feedNextBuffer(command_string);
+ ParseResult parse_result = parser_wrapper.getNextStatement();
CHECK(parse_result.condition == ParseResult::kSuccess)
<< "Any syntax error should be addressed in the DistributedCli.";
const ParseStatement &statement = *parse_result.parsed_statement;
- DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType());
-
- const ParseCommand &command = static_cast<const ParseCommand &>(statement);
- const PtrVector<ParseString> &arguments = *(command.arguments());
- const string &command_str = command.command()->value();
-
- string command_response;
try {
- if (command_str == C::kDescribeDatabaseCommand) {
- command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
- } else if (command_str == C::kDescribeTableCommand) {
- if (arguments.empty()) {
+ if (statement.getStatementType() == ParseStatement::kCommand) {
+ const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
+ const PtrVector<ParseString> &arguments = *(parse_command.arguments());
+ const string &command = parse_command.command()->value();
+
+ string command_response;
+ if (command == C::kDescribeDatabaseCommand) {
command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
- } else {
- command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+ } else if (command == C::kDescribeTableCommand) {
+ if (arguments.empty()) {
+ command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+ } else {
+ command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+ }
}
- }
- } catch (const SqlError &command_error) {
- // Set the query execution status along with the error message.
- S::QueryExecutionErrorMessage proto;
- proto.set_error_message(command_error.formatMessage(*command_string));
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kQueryExecutionErrorMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '"
- << kQueryExecutionErrorMessage
- << "') to Distributed CLI " << sender;
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
- }
-
- S::CommandResponseMessage proto;
- proto.set_command_response(command_response);
-
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
- << "') to Distributed CLI " << sender;
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
-}
-
-void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
- parser_wrapper_.feedNextBuffer(command_string);
- ParseResult parse_result = parser_wrapper_.getNextStatement();
-
- CHECK(parse_result.condition == ParseResult::kSuccess)
- << "Any SQL syntax error should be addressed in the DistributedCli.";
-
- const ParseStatement &statement = *parse_result.parsed_statement;
- DCHECK_NE(ParseStatement::kCommand, statement.getStatementType());
-
- try {
- auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
- sender,
- statement.getPriority());
- query_processor_->generateQueryHandle(statement, query_handle.get());
- DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- conductor_client_id_,
- foreman_->getBusClientID(),
- query_handle.release(),
- &bus_);
+ S::CommandResponseMessage proto;
+ proto.set_command_response(command_response);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
+ << "') to Distributed CLI " << sender;
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+ } else {
+ auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
+ sender,
+ statement.getPriority());
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ conductor_client_id_,
+ foreman_->getBusClientID(),
+ query_handle.release(),
+ &bus_);
+ }
} catch (const SqlError &sql_error) {
// Set the query execution status along with the error message.
S::QueryExecutionErrorMessage proto;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index e7e003f..0c0f7e5 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -24,7 +24,6 @@
#include <string>
#include "cli/distributed/Role.hpp"
-#include "parser/SqlParserWrapper.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
#include "query_optimizer/QueryProcessor.hpp"
@@ -61,12 +60,8 @@ class Conductor final : public Role {
void run() override;
private:
- void processCommandMessage(const tmb::client_id sender, std::string *command_string);
-
void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
- SqlParserWrapper parser_wrapper_;
-
std::unique_ptr<QueryProcessor> query_processor_;
// Not owned.
CatalogDatabase *catalog_database_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 47246d8..a45e8df 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -81,10 +81,6 @@ message ShiftbossRegistrationResponseMessage {
required uint64 shiftboss_index = 1;
}
-message CommandMessage {
- required string command = 1;
-}
-
message SqlQueryMessage {
required string sql_query = 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 0fd0bdf..a49de5e 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -90,9 +90,7 @@ enum QueryExecutionMessageType : message_type_id {
kDistributedCliRegistrationMessage, // From CLI to Conductor.
kDistributedCliRegistrationResponseMessage, // From Conductor to CLI.
- // From CLI to Conductor.
- kCommandMessage,
- kSqlQueryMessage,
+ kSqlQueryMessage, // From CLI to Conductor.
kQueryInitiateMessage, // From Foreman to Shiftboss.
kQueryInitiateResponseMessage, // From Shiftboss to Foreman.
[6/8] incubator-quickstep git commit: Set the default
tmb_implementation value to purememory.
Posted by zu...@apache.org.
Set the default tmb_implementation value to purememory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/04b5f7e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/04b5f7e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/04b5f7e0
Branch: refs/heads/dist-patch
Commit: 04b5f7e07973cfc2e9267a8d15178920ad5d88c5
Parents: e37ec54
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Feb 11 11:17:55 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:14:37 2017 -0800
----------------------------------------------------------------------
third_party/src/tmb/src/tmb_net_server.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/04b5f7e0/third_party/src/tmb/src/tmb_net_server.cc
----------------------------------------------------------------------
diff --git a/third_party/src/tmb/src/tmb_net_server.cc b/third_party/src/tmb/src/tmb_net_server.cc
index c2be7ef..abe0efe 100644
--- a/third_party/src/tmb/src/tmb_net_server.cc
+++ b/third_party/src/tmb/src/tmb_net_server.cc
@@ -256,7 +256,7 @@ static bool ValidateTmbImplementation(const char *flagname,
"Invalid value for --%s: %s\n", flagname, value.c_str());
return false;
}
-DEFINE_string(tmb_implementation, "nativelog",
+DEFINE_string(tmb_implementation, "purememory",
"Which underlying TMB implementation to use. Valid choices are "
"leveldb, nativelog, purememory, sqlite, voltdb, and zookeeper. "
"Depending on how the TMB library was built, some of these may "
[5/8] incubator-quickstep git commit: Enabled dag viz by default.
Posted by zu...@apache.org.
Enabled dag viz by default.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1c6e16c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1c6e16c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1c6e16c6
Branch: refs/heads/dist-patch
Commit: 1c6e16c6825fe9e74d1f46f12a44c382f309b648
Parents: 04b5f7e
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Feb 11 11:40:33 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:14:37 2017 -0800
----------------------------------------------------------------------
query_execution/QueryManagerBase.cpp | 2 +-
query_optimizer/tests/DistributedExecutionGeneratorTest.cpp | 3 +++
2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c6e16c6/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index f84ad4e..2dd5467 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -37,7 +37,7 @@ using std::pair;
namespace quickstep {
-DEFINE_bool(visualize_execution_dag, false,
+DEFINE_bool(visualize_execution_dag, true,
"If true, visualize the execution plan DAG into a graph in DOT "
"format (DOT is a plain text graph description language) which is "
"then printed via stderr.");
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c6e16c6/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
index b18b5ec..9b96e12 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -43,6 +43,8 @@ using std::make_unique;
QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
+namespace quickstep { DECLARE_bool(visualize_execution_dag); }
+
int main(int argc, char** argv) {
google::InitGoogleLogging(argv[0]);
@@ -52,6 +54,7 @@ int main(int argc, char** argv) {
quickstep::optimizer::FLAGS_use_filter_joins = false;
// Honor FLAGS_buffer_pool_slots in StorageManager.
+ quickstep::FLAGS_visualize_execution_dag = false;
gflags::ParseCommandLineFlags(&argc, &argv, true);
if (argc < 4) {
[7/8] incubator-quickstep git commit: Disabled LIP in the distributed
version.
Posted by zu...@apache.org.
Disabled LIP in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/09d6f731
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/09d6f731
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/09d6f731
Branch: refs/heads/dist-patch
Commit: 09d6f731e62bb00b900f1f073f4049ba2f686953
Parents: 1c6e16c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Feb 12 17:48:53 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:14:37 2017 -0800
----------------------------------------------------------------------
query_optimizer/PhysicalGenerator.cpp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/09d6f731/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index ac51c31..ca6db3f 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -59,7 +59,7 @@ DEFINE_bool(reorder_hash_joins, true,
"cardinality and selective tables to be joined first, which is suitable "
"for queries on star-schema tables.");
-DEFINE_bool(use_filter_joins, true,
+DEFINE_bool(use_filter_joins, false,
"If true, apply an optimization that strength-reduces HashJoins to "
"FilterJoins (implemented as LIPFilters attached to some anchoring "
"operators. Briefly speaking, in the case that the join attribute has "
@@ -67,7 +67,7 @@ DEFINE_bool(use_filter_joins, true,
"build a BitVector on the build-side attribute and use the BitVector "
"to filter the probe side table.");
-DEFINE_bool(use_lip_filters, true,
+DEFINE_bool(use_lip_filters, false,
"If true, use LIP (Lookahead Information Passing) filters to accelerate "
"query processing. LIP filters are effective for queries on star schema "
"tables (e.g. the SSB benchmark) and snowflake schema tables (e.g. the "
[8/8] incubator-quickstep git commit: Refactored block loading order
in StorageManager::loadBlockOrBlob.
Posted by zu...@apache.org.
Refactored block loading order in StorageManager::loadBlockOrBlob.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c5aa0412
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c5aa0412
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c5aa0412
Branch: refs/heads/dist-patch
Commit: c5aa041270cd4db8ae9b71af70f325ebda855598
Parents: 09d6f73
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Feb 11 11:24:54 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:14:37 2017 -0800
----------------------------------------------------------------------
storage/StorageManager.cpp | 60 ++++++++++++++++++++---------------------
1 file changed, 29 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c5aa0412/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 94e1b67..b920c17 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -666,45 +666,43 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
// already loaded before this function gets called.
BlockHandle loaded_handle;
-#ifdef QUICKSTEP_DISTRIBUTED
- // TODO(quickstep-team): Use a cost model to determine whether to load from
- // a remote peer or the disk.
- if (BlockIdUtil::Domain(block) != block_domain_) {
- DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
- const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
- for (const string &peer_domain_network_address : peer_domain_network_addresses) {
- DataExchangerClientAsync client(
- grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
- this);
-
- if (client.Pull(block, numa_node, &loaded_handle)) {
- sendBlockLocationMessage(block, kAddBlockLocationMessage);
- return loaded_handle;
- }
- }
+ const size_t num_slots = file_manager_->numSlots(block);
+ if (num_slots != 0) {
+ void *block_buffer = allocateSlots(num_slots, numa_node);
- DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
- << " from remote peers, so try to load from disk.";
- }
-#endif
+ const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
+ CHECK(status) << "Failed to read block from persistent storage: " << block;
- const size_t num_slots = file_manager_->numSlots(block);
- DEBUG_ASSERT(num_slots != 0);
- void *block_buffer = allocateSlots(num_slots, numa_node);
+ loaded_handle.block_memory = block_buffer;
+ loaded_handle.block_memory_size = num_slots;
- const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
- CHECK(status) << "Failed to read block from persistent storage: " << block;
+#ifdef QUICKSTEP_DISTRIBUTED
+ if (bus_) {
+ sendBlockLocationMessage(block, kAddBlockLocationMessage);
+ }
+#endif
- loaded_handle.block_memory = block_buffer;
- loaded_handle.block_memory_size = num_slots;
+ return loaded_handle;
+ }
#ifdef QUICKSTEP_DISTRIBUTED
- if (bus_) {
- sendBlockLocationMessage(block, kAddBlockLocationMessage);
+ // TODO(quickstep-team): Use a cost model to determine whether to load from
+ // a remote peer or the disk.
+ DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+ const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
+ for (const string &peer_domain_network_address : peer_domain_network_addresses) {
+ DataExchangerClientAsync client(
+ grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
+ this);
+
+ if (client.Pull(block, numa_node, &loaded_handle)) {
+ sendBlockLocationMessage(block, kAddBlockLocationMessage);
+ return loaded_handle;
+ }
}
+ LOG(FATAL) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+ << " from remote peers.";
#endif
-
- return loaded_handle;
}
void StorageManager::insertBlockHandleAfterLoad(const block_id block,
[4/8] incubator-quickstep git commit: Added \analyze support in the
distributed version.
Posted by zu...@apache.org.
Added \analyze support in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e37ec541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e37ec541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e37ec541
Branch: refs/heads/dist-patch
Commit: e37ec541c1490d8434eb441a1c0389e20f80ec6e
Parents: 7f25d1c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 23:36:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:09:36 2017 -0800
----------------------------------------------------------------------
cli/CMakeLists.txt | 2 +-
cli/distributed/CMakeLists.txt | 7 +
cli/distributed/Cli.cpp | 8 +-
cli/distributed/Conductor.cpp | 99 ++++++++++++-
cli/distributed/Conductor.hpp | 8 +-
query_execution/CMakeLists.txt | 12 ++
query_execution/ForemanDistributed.cpp | 25 +++-
query_execution/ForemanDistributed.hpp | 12 +-
query_execution/PolicyEnforcerDistributed.cpp | 155 ++++++++++++++++++---
query_execution/PolicyEnforcerDistributed.hpp | 14 ++
query_execution/QueryExecutionTypedefs.hpp | 2 +-
query_optimizer/CMakeLists.txt | 1 +
query_optimizer/QueryHandle.hpp | 43 +++++-
13 files changed, 351 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index ffeed2f..4562fe6 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -84,6 +84,7 @@ target_link_libraries(quickstep_cli_CommandExecutor
quickstep_expressions_aggregation_AggregateFunctionMax
quickstep_expressions_aggregation_AggregateFunctionMin
quickstep_parser_ParseStatement
+ quickstep_parser_ParseString
quickstep_parser_SqlParserWrapper
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_QueryPlan
@@ -93,7 +94,6 @@ target_link_libraries(quickstep_cli_CommandExecutor
quickstep_storage_StorageManager
quickstep_storage_TupleIdSequence
quickstep_storage_TupleStorageSubBlock
- quickstep_parser_ParseString
quickstep_types_Type
quickstep_types_TypeID
quickstep_types_TypedValue
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index 1f7dee0..2db27e5 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -25,13 +25,19 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
# Link dependencies:
target_link_libraries(quickstep_cli_distributed_Conductor
glog
+ quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
quickstep_cli_CommandExecutorUtil
quickstep_cli_Constants
quickstep_cli_DefaultsConfigurator
quickstep_cli_Flags
quickstep_cli_distributed_Role
+ quickstep_expressions_aggregation_AggregateFunctionMax
+ quickstep_expressions_aggregation_AggregateFunctionMin
quickstep_parser_ParseStatement
+ quickstep_parser_ParseString
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_BlockLocator
quickstep_queryexecution_ForemanDistributed
@@ -42,6 +48,7 @@ target_link_libraries(quickstep_cli_distributed_Conductor
quickstep_queryoptimizer_QueryProcessor
quickstep_storage_StorageConstants
quickstep_utility_Macros
+ quickstep_utility_PtrVector
quickstep_utility_SqlError
quickstep_utility_StringUtil
tmb)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 63f3259..14880a7 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -167,11 +167,9 @@ void Cli::run() {
const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
const std::string &command = parse_command.command()->value();
try {
- if (command == C::kAnalyzeCommand) {
- // TODO(zuyu): support '\analyze'.
- THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command";
- } else if (command != C::kDescribeDatabaseCommand &&
- command != C::kDescribeTableCommand) {
+ if (command != C::kAnalyzeCommand &&
+ command != C::kDescribeDatabaseCommand &&
+ command != C::kDescribeTableCommand) {
THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command";
}
} catch (const SqlError &error) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 1b8bfb2..5fb4453 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -27,13 +27,20 @@
#include <sstream>
#include <string>
#include <utility>
+#include <vector>
+#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
#include "cli/CommandExecutorUtil.hpp"
#include "cli/Constants.hpp"
#include "cli/DefaultsConfigurator.hpp"
#include "cli/Flags.hpp"
+#include "expressions/aggregation/AggregateFunctionMax.hpp"
+#include "expressions/aggregation/AggregateFunctionMin.hpp"
#include "parser/ParseStatement.hpp"
+#include "parser/ParseString.hpp"
#include "parser/SqlParserWrapper.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
@@ -43,6 +50,7 @@
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageConstants.hpp"
+#include "utility/PtrVector.hpp"
#include "utility/SqlError.hpp"
#include "utility/StringUtil.hpp"
@@ -69,6 +77,8 @@ namespace quickstep {
namespace C = cli;
namespace S = serialization;
+class Type;
+
void Conductor::init() {
try {
string catalog_path = FLAGS_storage_path + kCatalogFilename;
@@ -154,7 +164,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
SqlParserWrapper parser_wrapper;
parser_wrapper.feedNextBuffer(command_string);
ParseResult parse_result = parser_wrapper.getNextStatement();
- CHECK(parse_result.condition == ParseResult::kSuccess)
+ CHECK_EQ(ParseResult::kSuccess, parse_result.condition)
<< "Any syntax error should be addressed in the DistributedCli.";
const ParseStatement &statement = *parse_result.parsed_statement;
@@ -165,6 +175,11 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
const PtrVector<ParseString> &arguments = *(parse_command.arguments());
const string &command = parse_command.command()->value();
+ if (command == C::kAnalyzeCommand) {
+ executeAnalyze(sender, arguments);
+ return;
+ }
+
string command_response;
if (command == C::kDescribeDatabaseCommand) {
command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
@@ -225,4 +240,86 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
}
}
+void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments) {
+ std::vector<std::reference_wrapper<const CatalogRelation>> relations;
+ if (arguments.empty()) {
+ relations.insert(relations.end(), catalog_database_->begin(), catalog_database_->end());
+ } else {
+ for (const auto &argument : arguments) {
+ const CatalogRelation *relation = catalog_database_->getRelationByName(argument.value());
+ if (relation == nullptr) {
+ THROW_SQL_ERROR_AT(&argument) << "Table does not exist";
+ }
+
+ relations.emplace_back(*relation);
+ }
+ }
+
+ // Analyze each relation in the database.
+ for (const CatalogRelation &relation : relations) {
+ const relation_id rel_id = relation.getID();
+ const string rel_name = EscapeQuotes(relation.getName(), '"');
+
+ // Get the number of distinct values for each column.
+ for (const CatalogAttribute &attribute : relation) {
+ const string attr_name = EscapeQuotes(attribute.getName(), '"');
+ const Type &attr_type = attribute.getType();
+ const bool is_min_applicable =
+ AggregateFunctionMin::Instance().canApplyToTypes({&attr_type});
+ const bool is_max_applicable =
+ AggregateFunctionMax::Instance().canApplyToTypes({&attr_type});
+
+ // NOTE(jianqiao): Note that the relation name and the attribute names may
+ // contain non-letter characters, e.g. CREATE TABLE "with space"("1" int).
+ // So here we need to format the names with double quotes (").
+ string *query_string = new string("SELECT COUNT(DISTINCT \"");
+ query_string->append(attr_name);
+ query_string->append("\")");
+ if (is_min_applicable) {
+ query_string->append(", MIN(\"");
+ query_string->append(attr_name);
+ query_string->append("\")");
+ }
+ if (is_max_applicable) {
+ query_string->append(", MAX(\"");
+ query_string->append(attr_name);
+ query_string->append("\")");
+ }
+ query_string->append(" FROM \"");
+ query_string->append(rel_name);
+ query_string->append("\";");
+
+ submitQuery(sender, query_string,
+ new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(),
+ attribute.getID(), is_min_applicable, is_max_applicable));
+ }
+
+ // Get the number of tuples for the relation.
+ string *query_string = new string("SELECT COUNT(*) FROM \"");
+ query_string->append(rel_name);
+ query_string->append("\";");
+
+ submitQuery(sender, query_string,
+ new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size()));
+ }
+}
+
+void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) {
+ SqlParserWrapper parser_wrapper;
+ parser_wrapper.feedNextBuffer(query);
+ ParseResult parse_result = parser_wrapper.getNextStatement();
+ DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+
+ const ParseStatement &statement = *parse_result.parsed_statement;
+
+ // Generate the query plan.
+ auto query_handle =
+ make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info);
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 0c0f7e5..28c8e02 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -26,15 +26,17 @@
#include "cli/distributed/Role.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
+#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "utility/Macros.hpp"
+#include "utility/PtrVector.hpp"
#include "tmb/id_typedefs.h"
namespace quickstep {
class CatalogDatabase;
-class ParseCommand;
+class ParseString;
/** \addtogroup CliDistributed
* @{
@@ -62,6 +64,10 @@ class Conductor final : public Role {
private:
void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
+ void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments);
+
+ void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info);
+
std::unique_ptr<QueryProcessor> query_processor_;
// Not owned.
CatalogDatabase *catalog_database_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 23b706f..82ff183 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -101,8 +101,10 @@ if (ENABLE_DISTRIBUTED)
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
+ quickstep_cli_Flags
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_BlockLocatorUtil
quickstep_queryexecution_ForemanBase
quickstep_queryexecution_PolicyEnforcerBase
quickstep_queryexecution_PolicyEnforcerDistributed
@@ -112,7 +114,9 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_ShiftbossDirectory
quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_DataExchangerAsync
quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
quickstep_threading_ThreadUtil
quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
@@ -151,7 +155,10 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
glog
+ quickstep_catalog_CatalogDatabase
quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogRelationSchema
+ quickstep_catalog_CatalogRelationStatistics
quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
quickstep_queryexecution_PolicyEnforcerBase
@@ -166,7 +173,12 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_ShiftbossDirectory
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_QueryProcessor
+ quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_storage_TupleIdSequence
+ quickstep_storage_TupleStorageSubBlock
+ quickstep_types_TypedValue
quickstep_utility_ExecutionDAGVisualizer
quickstep_utility_Macros
tmb
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 57f432f..3d47fb6 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -27,8 +27,10 @@
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "cli/Flags.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/PolicyEnforcerDistributed.hpp"
#include "query_execution/QueryContext.hpp"
@@ -37,7 +39,9 @@
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "relational_operators/WorkOrder.pb.h"
+#include "storage/DataExchangerAsync.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
#include "threading/ThreadUtil.hpp"
#include "utility/EqualsAnyConstant.hpp"
@@ -49,6 +53,7 @@
#include "tmb/message_style.h"
#include "tmb/tagged_message.h"
+using std::make_unique;
using std::move;
using std::size_t;
using std::unique_ptr;
@@ -75,6 +80,7 @@ ForemanDistributed::ForemanDistributed(
block_locator_(block_locator),
catalog_database_(DCHECK_NOTNULL(catalog_database)) {
const std::vector<QueryExecutionMessageType> sender_message_types{
+ kBlockDomainRegistrationMessage,
kShiftbossRegistrationResponseMessage,
kQueryInitiateMessage,
kWorkOrderMessage,
@@ -82,6 +88,7 @@ ForemanDistributed::ForemanDistributed(
kQueryTeardownMessage,
kSaveQueryResultMessage,
kQueryExecutionSuccessMessage,
+ kCommandResponseMessage,
kPoisonMessage};
for (const auto message_type : sender_message_types) {
@@ -89,6 +96,7 @@ ForemanDistributed::ForemanDistributed(
}
const std::vector<QueryExecutionMessageType> receiver_message_types{
+ kBlockDomainRegistrationResponseMessage,
kShiftbossRegistrationMessage,
kAdmitRequestMessage,
kQueryInitiateResponseMessage,
@@ -105,12 +113,17 @@ ForemanDistributed::ForemanDistributed(
bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
}
- policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
- foreman_client_id_,
- catalog_database_,
- query_processor,
- &shiftboss_directory_,
- bus_);
+ client_id locator_client_id;
+ storage_manager_ = make_unique<StorageManager>(
+ FLAGS_storage_path,
+ block_locator::getBlockDomain(data_exchanger_.network_address(), foreman_client_id_, &locator_client_id, bus_),
+ locator_client_id, bus_);
+
+ data_exchanger_.set_storage_manager(storage_manager_.get());
+ data_exchanger_.start();
+
+ policy_enforcer_ = make_unique<PolicyEnforcerDistributed>(
+ foreman_client_id_, catalog_database_, query_processor, storage_manager_.get(), &shiftboss_directory_, bus_);
}
void ForemanDistributed::run() {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 7fc98bd..4053b9d 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -25,6 +25,8 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/ForemanBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
@@ -70,7 +72,11 @@ class ForemanDistributed final : public ForemanBase {
QueryProcessor *query_processor,
const int cpu_id = -1);
- ~ForemanDistributed() override {}
+ ~ForemanDistributed() override {
+ data_exchanger_.shutdown();
+ storage_manager_.reset();
+ data_exchanger_.join();
+ }
void printWorkOrderProfilingResults(const std::size_t query_id,
std::FILE *out) const override;
@@ -126,6 +132,10 @@ class ForemanDistributed final : public ForemanBase {
CatalogDatabaseLite *catalog_database_;
+ // Used for '\analyze'.
+ DataExchangerAsync data_exchanger_;
+ std::unique_ptr<StorageManager> storage_manager_;
+
// From a query id to a set of Shiftbosses that save query result.
std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 25f2d72..424452e 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -26,7 +26,10 @@
#include <vector>
#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogRelationStatistics.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionState.hpp"
@@ -36,7 +39,12 @@
#include "query_execution/QueryManagerDistributed.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
+#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "types/TypedValue.hpp"
#include "utility/ExecutionDAGVisualizer.hpp"
#include "gflags/gflags.h"
@@ -50,7 +58,9 @@
using std::free;
using std::malloc;
using std::move;
+using std::ostringstream;
using std::size_t;
+using std::string;
using std::unique_ptr;
using std::vector;
@@ -231,7 +241,7 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) {
const QueryHandle *query_handle = query_manager->query_handle();
- const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+ const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
const tmb::client_id cli_id = query_handle->getClientId();
const std::size_t query_id = query_handle->query_id();
@@ -259,7 +269,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
}
- if (query_result == nullptr) {
+ if (query_result_relation == nullptr) {
if (query_processor_) {
query_processor_->saveCatalog();
}
@@ -272,17 +282,12 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kQueryTeardownMessage);
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
free(proto_bytes);
DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
<< "') to all Shiftbosses";
- QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
- shiftboss_addresses,
- move(message),
- bus_);
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
TaggedMessage cli_message(kQueryExecutionSuccessMessage);
@@ -299,12 +304,33 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
return;
}
+ const QueryHandle::AnalyzeQueryInfo *analyze_query_info = query_handle->analyze_query_info();
+ if (analyze_query_info) {
+ processAnalyzeQueryResult(cli_id, query_result_relation, analyze_query_info);
+
+ // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
+ S::QueryTeardownMessage proto;
+ proto.set_query_id(query_id);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+ << "') to all Shiftbosses";
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+ return;
+ }
+
// NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
S::SaveQueryResultMessage proto;
proto.set_query_id(query_id);
- proto.set_relation_id(query_result->getID());
+ proto.set_relation_id(query_result_relation->getID());
- const vector<block_id> blocks(query_result->getBlocksSnapshot());
+ const vector<block_id> blocks(query_result_relation->getBlocksSnapshot());
for (const block_id block : blocks) {
proto.add_blocks(block);
}
@@ -315,18 +341,111 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kSaveQueryResultMessage);
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kSaveQueryResultMessage);
free(proto_bytes);
// TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
<< "') to all Shiftbosses";
- QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
- shiftboss_addresses,
- move(message),
- bus_);
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+}
+
+void PolicyEnforcerDistributed::processAnalyzeQueryResult(const tmb::client_id cli_id,
+ const CatalogRelation *query_result_relation,
+ const QueryHandle::AnalyzeQueryInfo *analyze_query_info) {
+ const relation_id rel_id = analyze_query_info->rel_id;
+ CatalogRelation *mutable_relation =
+ static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(rel_id);
+ CatalogRelationStatistics *mutable_stat = mutable_relation->getStatisticsMutable();
+
+ const auto analyze_query_result = [this, &query_result_relation]() {
+ const vector<block_id> blocks = query_result_relation->getBlocksSnapshot();
+ DCHECK_EQ(1u, blocks.size());
+
+ vector<TypedValue> query_result;
+ {
+ BlockReference block = storage_manager_->getBlock(blocks.front(), *query_result_relation);
+ const TupleStorageSubBlock &tuple_store = block->getTupleStorageSubBlock();
+ DCHECK_EQ(1, tuple_store.numTuples());
+
+ const std::size_t num_columns = tuple_store.getRelation().size();
+ if (tuple_store.isPacked()) {
+ for (std::size_t i = 0; i < num_columns; ++i) {
+ query_result.emplace_back(tuple_store.getAttributeValueTyped(0, i));
+ }
+ } else {
+ std::unique_ptr<TupleIdSequence> existence_map(tuple_store.getExistenceMap());
+ for (std::size_t i = 0; i < num_columns; ++i) {
+ query_result.emplace_back(
+ tuple_store.getAttributeValueTyped(*existence_map->begin(), i));
+ }
+ }
+ }
+
+ // Clean up the query result relation.
+ for (const block_id block : blocks) {
+ storage_manager_->deleteBlockOrBlobFile(block);
+ }
+ catalog_database_->dropRelationById(query_result_relation->getID());
+
+ return query_result;
+ }();
+
+ if (analyze_query_info->is_analyze_attribute_query) {
+ const attribute_id attr_id = analyze_query_info->attr_id;
+
+ auto cit = analyze_query_result.begin();
+ DCHECK_EQ(TypeID::kLong, cit->getTypeID());
+ mutable_stat->setNumDistinctValues(attr_id, cit->getLiteral<std::int64_t>());
+
+ if (analyze_query_info->is_min_applicable) {
+ ++cit;
+ mutable_stat->setMinValue(attr_id, *cit);
+ }
+
+ if (analyze_query_info->is_max_applicable) {
+ ++cit;
+ mutable_stat->setMaxValue(attr_id, *cit);
+ }
+ } else {
+ completed_analyze_relations_[cli_id].push_back(rel_id);
+
+ DCHECK_EQ(1u, analyze_query_result.size());
+ const TypedValue &num_tuples = analyze_query_result.front();
+ DCHECK_EQ(TypeID::kLong, num_tuples.getTypeID());
+ mutable_stat->setNumTuples(num_tuples.getLiteral<std::int64_t>());
+
+ mutable_stat->setExactness(true);
+
+ if (completed_analyze_relations_[cli_id].size() == analyze_query_info->num_relations) {
+ query_processor_->markCatalogAltered();
+ query_processor_->saveCatalog();
+
+ ostringstream analyze_command_response;
+ for (const relation_id rel_id : completed_analyze_relations_[cli_id]) {
+ analyze_command_response << "Analyzing " << catalog_database_->getRelationSchemaById(rel_id).getName()
+ << " ... done\n";
+ }
+
+ S::CommandResponseMessage proto;
+ proto.set_command_response(analyze_command_response.str());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage (typed '" << kCommandResponseMessage
+ << "') to CLI with TMB client id " << cli_id;
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+ completed_analyze_relations_.erase(cli_id);
+ }
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 18fd9ae..5334da0 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -28,6 +28,7 @@
#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "query_optimizer/QueryHandle.hpp"
+#include "types/TypedValue.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -42,7 +43,9 @@ class TaggedMessage;
namespace quickstep {
class CatalogDatabaseLite;
+class CatalogRelation;
class QueryProcessor;
+class StorageManager;
/** \addtogroup QueryExecution
* @{
@@ -61,16 +64,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
* @param catalog_database The CatalogDatabase used.
* @param query_processor The QueryProcessor to save catalog upon the query
* completion.
+ * @param storage_manager The StorageManager to use.
+ * @param shiftboss_directory The ShiftbossDirectory to manage Shiftbosses.
* @param bus The TMB.
**/
PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
CatalogDatabaseLite *catalog_database,
QueryProcessor *query_processor,
+ StorageManager *storage_manager,
ShiftbossDirectory *shiftboss_directory,
tmb::MessageBus *bus)
: PolicyEnforcerBase(catalog_database),
foreman_client_id_(foreman_client_id),
query_processor_(query_processor),
+ storage_manager_(storage_manager),
shiftboss_directory_(shiftboss_directory),
bus_(bus) {}
@@ -157,13 +164,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void initiateQueryInShiftboss(QueryHandle *query_handle);
+ void processAnalyzeQueryResult(const tmb::client_id cli_id,
+ const CatalogRelation *query_result_relation,
+ const QueryHandle::AnalyzeQueryInfo *analyze_query_info);
+
const tmb::client_id foreman_client_id_;
QueryProcessor *query_processor_;
+ StorageManager *storage_manager_;
ShiftbossDirectory *shiftboss_directory_;
tmb::MessageBus *bus_;
+ std::unordered_map<tmb::client_id, std::vector<relation_id>> completed_analyze_relations_;
+
DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index a49de5e..081852f 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -90,7 +90,7 @@ enum QueryExecutionMessageType : message_type_id {
kDistributedCliRegistrationMessage, // From CLI to Conductor.
kDistributedCliRegistrationResponseMessage, // From Conductor to CLI.
- kSqlQueryMessage, // From CLI to Conductor.
+ kSqlQueryMessage, // From CLI to Conductor.
kQueryInitiateMessage, // From Foreman to Shiftboss.
kQueryInitiateResponseMessage, // From Shiftboss to Foreman.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 3ff783c..9bdb753 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -229,6 +229,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
quickstep_utility_Macros
quickstep_utility_PlanVisualizer)
target_link_libraries(quickstep_queryoptimizer_QueryHandle
+ quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
quickstep_queryexecution_QueryContext_proto
quickstep_queryoptimizer_QueryPlan
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 7cb4f68..6feeb4c 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -25,6 +25,7 @@
#include <memory>
#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "query_optimizer/QueryPlan.hpp"
@@ -45,19 +46,47 @@ class CatalogRelation;
**/
class QueryHandle {
public:
+ // The info for generated queries during executing '\analyze'.
+ struct AnalyzeQueryInfo {
+ AnalyzeQueryInfo(const bool is_analyze_attribute_query_in,
+ const relation_id rel_id_in,
+ const std::size_t num_relations_in,
+ const attribute_id attr_id_in = kInvalidCatalogId,
+ const bool is_min_applicable_in = false,
+ const bool is_max_applicable_in = false)
+ : is_analyze_attribute_query(is_analyze_attribute_query_in),
+ rel_id(rel_id_in),
+ num_relations(num_relations_in),
+ attr_id(attr_id_in),
+ is_min_applicable(is_min_applicable_in),
+ is_max_applicable(is_max_applicable_in) {}
+
+ const bool is_analyze_attribute_query;
+ const relation_id rel_id;
+ const std::size_t num_relations;
+
+ // Only valid if 'is_analyze_attribute_query' is true.
+ const attribute_id attr_id;
+ const bool is_min_applicable;
+ const bool is_max_applicable;
+ };
+
/**
* @brief Constructor.
*
* @param query_id The given query id.
* @param cli_id The client id of the CLI which submits the query.
* @param query_priority The priority of this query.
+ * @param analyze_query_info The info of this analyze query.
*/
- explicit QueryHandle(const std::size_t query_id,
- const tmb::client_id cli_id,
- const std::uint64_t query_priority = 1)
+ QueryHandle(const std::size_t query_id,
+ const tmb::client_id cli_id,
+ const std::uint64_t query_priority = 1,
+ AnalyzeQueryInfo *analyze_query_info = nullptr)
: query_id_(query_id),
cli_id_(cli_id),
query_priority_(query_priority),
+ analyze_query_info_(analyze_query_info),
query_plan_(new QueryPlan()),
query_result_relation_(nullptr) {}
@@ -87,6 +116,13 @@ class QueryHandle {
}
/**
+ * @brief Get the query info for the command '\analyze'.
+ **/
+ const AnalyzeQueryInfo* analyze_query_info() const {
+ return analyze_query_info_.get();
+ }
+
+ /**
* @return The const query plan.
*/
const QueryPlan& getQueryPlan() const {
@@ -165,6 +201,7 @@ class QueryHandle {
const tmb::client_id cli_id_;
const std::uint64_t query_priority_;
+ std::unique_ptr<AnalyzeQueryInfo> analyze_query_info_;
std::unique_ptr<QueryPlan> query_plan_;
[2/8] incubator-quickstep git commit: Refactored catalog saving in
the distributed version.
Posted by zu...@apache.org.
Refactored catalog saving in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccb2852f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccb2852f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccb2852f
Branch: refs/heads/dist-patch
Commit: ccb2852f71da77d364d4bfcb276cb6318b751a8c
Parents: 964a806
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 17:14:50 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 2 17:14:50 2017 -0800
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 1 +
query_execution/ForemanDistributed.cpp | 5 ++---
query_execution/ForemanDistributed.hpp | 6 +++---
query_execution/PolicyEnforcerDistributed.cpp | 5 ++++-
query_execution/PolicyEnforcerDistributed.hpp | 13 ++++++-------
.../tests/DistributedExecutionGeneratorTestRunner.cpp | 6 ++----
6 files changed, 18 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 12d6be0..23b706f 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -165,6 +165,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryManagerDistributed
quickstep_queryexecution_ShiftbossDirectory
quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_QueryProcessor
quickstep_storage_StorageBlockInfo
quickstep_utility_ExecutionDAGVisualizer
quickstep_utility_Macros
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 389d6ab..57f432f 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -17,7 +17,6 @@
#include <cstddef>
#include <cstdio>
#include <cstdlib>
-#include <functional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
@@ -68,9 +67,9 @@ class QueryHandle;
ForemanDistributed::ForemanDistributed(
const BlockLocator &block_locator,
- std::function<void()> &&save_catalog_callback,
MessageBus *bus,
CatalogDatabaseLite *catalog_database,
+ QueryProcessor *query_processor,
const int cpu_id)
: ForemanBase(bus, cpu_id),
block_locator_(block_locator),
@@ -108,8 +107,8 @@ ForemanDistributed::ForemanDistributed(
policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
foreman_client_id_,
- move(save_catalog_callback),
catalog_database_,
+ query_processor,
&shiftboss_directory_,
bus_);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 5f1a14b..7fc98bd 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -17,7 +17,6 @@
#include <cstddef>
#include <cstdio>
-#include <functional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
@@ -36,6 +35,7 @@ namespace quickstep {
class BlockLocator;
class CatalogDatabaseLite;
+class QueryProcessor;
namespace serialization { class WorkOrderMessage; }
@@ -56,7 +56,7 @@ class ForemanDistributed final : public ForemanBase {
* @param block_locator The block locator that manages block location info.
* @param bus A pointer to the TMB.
* @param catalog_database The catalog database where this query is executed.
- * @param save_catalog_callback The callback used to save catalog upon the query
+ * @param query_processor The QueryProcessor to save catalog upon the query
* completion.
* @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
*
@@ -65,9 +65,9 @@ class ForemanDistributed final : public ForemanBase {
**/
ForemanDistributed(
const BlockLocator &block_locator,
- std::function<void()> &&save_catalog_callback,
tmb::MessageBus *bus,
CatalogDatabaseLite *catalog_database,
+ QueryProcessor *query_processor,
const int cpu_id = -1);
~ForemanDistributed() override {}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6ee58a8..25f2d72 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -35,6 +35,7 @@
#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/QueryManagerDistributed.hpp"
#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/ExecutionDAGVisualizer.hpp"
@@ -259,7 +260,9 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
}
if (query_result == nullptr) {
- save_catalog_callback_();
+ if (query_processor_) {
+ query_processor_->saveCatalog();
+ }
// Clean up query execution states, i.e., QueryContext, in Shiftbosses.
serialization::QueryTeardownMessage proto;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index f8476c8..18fd9ae 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -16,7 +16,6 @@
#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
#include <cstddef>
-#include <functional>
#include <memory>
#include <unordered_map>
#include <utility>
@@ -43,6 +42,7 @@ class TaggedMessage;
namespace quickstep {
class CatalogDatabaseLite;
+class QueryProcessor;
/** \addtogroup QueryExecution
* @{
@@ -58,19 +58,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
* @brief Constructor.
*
* @param foreman_client_id The TMB client ID of the Foreman.
- * @param save_catalog_callback The callback used to save catalog upon the query
- * completion.
* @param catalog_database The CatalogDatabase used.
+ * @param query_processor The QueryProcessor to save catalog upon the query
+ * completion.
* @param bus The TMB.
**/
PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
- std::function<void()> &&save_catalog_callback,
CatalogDatabaseLite *catalog_database,
+ QueryProcessor *query_processor,
ShiftbossDirectory *shiftboss_directory,
tmb::MessageBus *bus)
: PolicyEnforcerBase(catalog_database),
foreman_client_id_(foreman_client_id),
- save_catalog_callback_(std::move(save_catalog_callback)),
+ query_processor_(query_processor),
shiftboss_directory_(shiftboss_directory),
bus_(bus) {}
@@ -159,8 +159,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
const tmb::client_id foreman_client_id_;
- const std::function<void()> save_catalog_callback_;
-
+ QueryProcessor *query_processor_;
ShiftbossDirectory *shiftboss_directory_;
tmb::MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 3b1259a..0eeb83f 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -71,8 +71,6 @@ namespace {
constexpr int kNumInstances = 3;
-void nop() {}
-
} // namespace
const char *DistributedExecutionGeneratorTestRunner::kResetOption =
@@ -110,8 +108,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
// NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
// could receive a registration message from the latter.
- foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
- test_database_loader_->catalog_database());
+ foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database(),
+ nullptr /* query_processor */);
// We don't use the NUMA aware version of worker code.
const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,