You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/03 12:14:48 UTC
[4/8] incubator-quickstep git commit: Added \analyze support in the
distributed version.
Added \analyze support in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e37ec541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e37ec541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e37ec541
Branch: refs/heads/dist-patch
Commit: e37ec541c1490d8434eb441a1c0389e20f80ec6e
Parents: 7f25d1c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 23:36:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:09:36 2017 -0800
----------------------------------------------------------------------
cli/CMakeLists.txt | 2 +-
cli/distributed/CMakeLists.txt | 7 +
cli/distributed/Cli.cpp | 8 +-
cli/distributed/Conductor.cpp | 99 ++++++++++++-
cli/distributed/Conductor.hpp | 8 +-
query_execution/CMakeLists.txt | 12 ++
query_execution/ForemanDistributed.cpp | 25 +++-
query_execution/ForemanDistributed.hpp | 12 +-
query_execution/PolicyEnforcerDistributed.cpp | 155 ++++++++++++++++++---
query_execution/PolicyEnforcerDistributed.hpp | 14 ++
query_execution/QueryExecutionTypedefs.hpp | 2 +-
query_optimizer/CMakeLists.txt | 1 +
query_optimizer/QueryHandle.hpp | 43 +++++-
13 files changed, 351 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index ffeed2f..4562fe6 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -84,6 +84,7 @@ target_link_libraries(quickstep_cli_CommandExecutor
quickstep_expressions_aggregation_AggregateFunctionMax
quickstep_expressions_aggregation_AggregateFunctionMin
quickstep_parser_ParseStatement
+ quickstep_parser_ParseString
quickstep_parser_SqlParserWrapper
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_QueryPlan
@@ -93,7 +94,6 @@ target_link_libraries(quickstep_cli_CommandExecutor
quickstep_storage_StorageManager
quickstep_storage_TupleIdSequence
quickstep_storage_TupleStorageSubBlock
- quickstep_parser_ParseString
quickstep_types_Type
quickstep_types_TypeID
quickstep_types_TypedValue
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index 1f7dee0..2db27e5 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -25,13 +25,19 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
# Link dependencies:
target_link_libraries(quickstep_cli_distributed_Conductor
glog
+ quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
quickstep_cli_CommandExecutorUtil
quickstep_cli_Constants
quickstep_cli_DefaultsConfigurator
quickstep_cli_Flags
quickstep_cli_distributed_Role
+ quickstep_expressions_aggregation_AggregateFunctionMax
+ quickstep_expressions_aggregation_AggregateFunctionMin
quickstep_parser_ParseStatement
+ quickstep_parser_ParseString
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_BlockLocator
quickstep_queryexecution_ForemanDistributed
@@ -42,6 +48,7 @@ target_link_libraries(quickstep_cli_distributed_Conductor
quickstep_queryoptimizer_QueryProcessor
quickstep_storage_StorageConstants
quickstep_utility_Macros
+ quickstep_utility_PtrVector
quickstep_utility_SqlError
quickstep_utility_StringUtil
tmb)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 63f3259..14880a7 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -167,11 +167,9 @@ void Cli::run() {
const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
const std::string &command = parse_command.command()->value();
try {
- if (command == C::kAnalyzeCommand) {
- // TODO(zuyu): support '\analyze'.
- THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command";
- } else if (command != C::kDescribeDatabaseCommand &&
- command != C::kDescribeTableCommand) {
+ if (command != C::kAnalyzeCommand &&
+ command != C::kDescribeDatabaseCommand &&
+ command != C::kDescribeTableCommand) {
THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command";
}
} catch (const SqlError &error) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 1b8bfb2..5fb4453 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -27,13 +27,20 @@
#include <sstream>
#include <string>
#include <utility>
+#include <vector>
+#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
#include "cli/CommandExecutorUtil.hpp"
#include "cli/Constants.hpp"
#include "cli/DefaultsConfigurator.hpp"
#include "cli/Flags.hpp"
+#include "expressions/aggregation/AggregateFunctionMax.hpp"
+#include "expressions/aggregation/AggregateFunctionMin.hpp"
#include "parser/ParseStatement.hpp"
+#include "parser/ParseString.hpp"
#include "parser/SqlParserWrapper.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
@@ -43,6 +50,7 @@
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageConstants.hpp"
+#include "utility/PtrVector.hpp"
#include "utility/SqlError.hpp"
#include "utility/StringUtil.hpp"
@@ -69,6 +77,8 @@ namespace quickstep {
namespace C = cli;
namespace S = serialization;
+class Type;
+
void Conductor::init() {
try {
string catalog_path = FLAGS_storage_path + kCatalogFilename;
@@ -154,7 +164,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
SqlParserWrapper parser_wrapper;
parser_wrapper.feedNextBuffer(command_string);
ParseResult parse_result = parser_wrapper.getNextStatement();
- CHECK(parse_result.condition == ParseResult::kSuccess)
+ CHECK_EQ(ParseResult::kSuccess, parse_result.condition)
<< "Any syntax error should be addressed in the DistributedCli.";
const ParseStatement &statement = *parse_result.parsed_statement;
@@ -165,6 +175,11 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
const PtrVector<ParseString> &arguments = *(parse_command.arguments());
const string &command = parse_command.command()->value();
+ if (command == C::kAnalyzeCommand) {
+ executeAnalyze(sender, arguments);
+ return;
+ }
+
string command_response;
if (command == C::kDescribeDatabaseCommand) {
command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
@@ -225,4 +240,86 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
}
}
+void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments) {
+ std::vector<std::reference_wrapper<const CatalogRelation>> relations;
+ if (arguments.empty()) {
+ relations.insert(relations.end(), catalog_database_->begin(), catalog_database_->end());
+ } else {
+ for (const auto &argument : arguments) {
+ const CatalogRelation *relation = catalog_database_->getRelationByName(argument.value());
+ if (relation == nullptr) {
+ THROW_SQL_ERROR_AT(&argument) << "Table does not exist";
+ }
+
+ relations.emplace_back(*relation);
+ }
+ }
+
+ // Analyze each relation in the database.
+ for (const CatalogRelation &relation : relations) {
+ const relation_id rel_id = relation.getID();
+ const string rel_name = EscapeQuotes(relation.getName(), '"');
+
+ // Get the number of distinct values for each column.
+ for (const CatalogAttribute &attribute : relation) {
+ const string attr_name = EscapeQuotes(attribute.getName(), '"');
+ const Type &attr_type = attribute.getType();
+ const bool is_min_applicable =
+ AggregateFunctionMin::Instance().canApplyToTypes({&attr_type});
+ const bool is_max_applicable =
+ AggregateFunctionMax::Instance().canApplyToTypes({&attr_type});
+
+ // NOTE(jianqiao): Note that the relation name and the attribute names may
+ // contain non-letter characters, e.g. CREATE TABLE "with space"("1" int).
+ // So here we need to format the names with double quotes (").
+ string *query_string = new string("SELECT COUNT(DISTINCT \"");
+ query_string->append(attr_name);
+ query_string->append("\")");
+ if (is_min_applicable) {
+ query_string->append(", MIN(\"");
+ query_string->append(attr_name);
+ query_string->append("\")");
+ }
+ if (is_max_applicable) {
+ query_string->append(", MAX(\"");
+ query_string->append(attr_name);
+ query_string->append("\")");
+ }
+ query_string->append(" FROM \"");
+ query_string->append(rel_name);
+ query_string->append("\";");
+
+ submitQuery(sender, query_string,
+ new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(),
+ attribute.getID(), is_min_applicable, is_max_applicable));
+ }
+
+ // Get the number of tuples for the relation.
+ string *query_string = new string("SELECT COUNT(*) FROM \"");
+ query_string->append(rel_name);
+ query_string->append("\";");
+
+ submitQuery(sender, query_string,
+ new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size()));
+ }
+}
+
+void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) {
+ SqlParserWrapper parser_wrapper;
+ parser_wrapper.feedNextBuffer(query);
+ ParseResult parse_result = parser_wrapper.getNextStatement();
+ DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+
+ const ParseStatement &statement = *parse_result.parsed_statement;
+
+ // Generate the query plan.
+ auto query_handle =
+ make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info);
+ query_processor_->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 0c0f7e5..28c8e02 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -26,15 +26,17 @@
#include "cli/distributed/Role.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/ForemanDistributed.hpp"
+#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "utility/Macros.hpp"
+#include "utility/PtrVector.hpp"
#include "tmb/id_typedefs.h"
namespace quickstep {
class CatalogDatabase;
-class ParseCommand;
+class ParseString;
/** \addtogroup CliDistributed
* @{
@@ -62,6 +64,10 @@ class Conductor final : public Role {
private:
void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
+ void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments);
+
+ void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info);
+
std::unique_ptr<QueryProcessor> query_processor_;
// Not owned.
CatalogDatabase *catalog_database_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 23b706f..82ff183 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -101,8 +101,10 @@ if (ENABLE_DISTRIBUTED)
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
+ quickstep_cli_Flags
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_BlockLocatorUtil
quickstep_queryexecution_ForemanBase
quickstep_queryexecution_PolicyEnforcerBase
quickstep_queryexecution_PolicyEnforcerDistributed
@@ -112,7 +114,9 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_ShiftbossDirectory
quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_DataExchangerAsync
quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
quickstep_threading_ThreadUtil
quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
@@ -151,7 +155,10 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
glog
+ quickstep_catalog_CatalogDatabase
quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogRelationSchema
+ quickstep_catalog_CatalogRelationStatistics
quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
quickstep_queryexecution_PolicyEnforcerBase
@@ -166,7 +173,12 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_ShiftbossDirectory
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_QueryProcessor
+ quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageManager
+ quickstep_storage_TupleIdSequence
+ quickstep_storage_TupleStorageSubBlock
+ quickstep_types_TypedValue
quickstep_utility_ExecutionDAGVisualizer
quickstep_utility_Macros
tmb
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 57f432f..3d47fb6 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -27,8 +27,10 @@
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "cli/Flags.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/PolicyEnforcerDistributed.hpp"
#include "query_execution/QueryContext.hpp"
@@ -37,7 +39,9 @@
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "relational_operators/WorkOrder.pb.h"
+#include "storage/DataExchangerAsync.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
#include "threading/ThreadUtil.hpp"
#include "utility/EqualsAnyConstant.hpp"
@@ -49,6 +53,7 @@
#include "tmb/message_style.h"
#include "tmb/tagged_message.h"
+using std::make_unique;
using std::move;
using std::size_t;
using std::unique_ptr;
@@ -75,6 +80,7 @@ ForemanDistributed::ForemanDistributed(
block_locator_(block_locator),
catalog_database_(DCHECK_NOTNULL(catalog_database)) {
const std::vector<QueryExecutionMessageType> sender_message_types{
+ kBlockDomainRegistrationMessage,
kShiftbossRegistrationResponseMessage,
kQueryInitiateMessage,
kWorkOrderMessage,
@@ -82,6 +88,7 @@ ForemanDistributed::ForemanDistributed(
kQueryTeardownMessage,
kSaveQueryResultMessage,
kQueryExecutionSuccessMessage,
+ kCommandResponseMessage,
kPoisonMessage};
for (const auto message_type : sender_message_types) {
@@ -89,6 +96,7 @@ ForemanDistributed::ForemanDistributed(
}
const std::vector<QueryExecutionMessageType> receiver_message_types{
+ kBlockDomainRegistrationResponseMessage,
kShiftbossRegistrationMessage,
kAdmitRequestMessage,
kQueryInitiateResponseMessage,
@@ -105,12 +113,17 @@ ForemanDistributed::ForemanDistributed(
bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
}
- policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
- foreman_client_id_,
- catalog_database_,
- query_processor,
- &shiftboss_directory_,
- bus_);
+ client_id locator_client_id;
+ storage_manager_ = make_unique<StorageManager>(
+ FLAGS_storage_path,
+ block_locator::getBlockDomain(data_exchanger_.network_address(), foreman_client_id_, &locator_client_id, bus_),
+ locator_client_id, bus_);
+
+ data_exchanger_.set_storage_manager(storage_manager_.get());
+ data_exchanger_.start();
+
+ policy_enforcer_ = make_unique<PolicyEnforcerDistributed>(
+ foreman_client_id_, catalog_database_, query_processor, storage_manager_.get(), &shiftboss_directory_, bus_);
}
void ForemanDistributed::run() {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 7fc98bd..4053b9d 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -25,6 +25,8 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/ForemanBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
@@ -70,7 +72,11 @@ class ForemanDistributed final : public ForemanBase {
QueryProcessor *query_processor,
const int cpu_id = -1);
- ~ForemanDistributed() override {}
+ ~ForemanDistributed() override {
+ data_exchanger_.shutdown();
+ storage_manager_.reset();
+ data_exchanger_.join();
+ }
void printWorkOrderProfilingResults(const std::size_t query_id,
std::FILE *out) const override;
@@ -126,6 +132,10 @@ class ForemanDistributed final : public ForemanBase {
CatalogDatabaseLite *catalog_database_;
+ // Used for '\analyze'.
+ DataExchangerAsync data_exchanger_;
+ std::unique_ptr<StorageManager> storage_manager_;
+
// From a query id to a set of Shiftbosses that save query result.
std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 25f2d72..424452e 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -26,7 +26,10 @@
#include <vector>
#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogRelationStatistics.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionState.hpp"
@@ -36,7 +39,12 @@
#include "query_execution/QueryManagerDistributed.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
+#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "types/TypedValue.hpp"
#include "utility/ExecutionDAGVisualizer.hpp"
#include "gflags/gflags.h"
@@ -50,7 +58,9 @@
using std::free;
using std::malloc;
using std::move;
+using std::ostringstream;
using std::size_t;
+using std::string;
using std::unique_ptr;
using std::vector;
@@ -231,7 +241,7 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) {
const QueryHandle *query_handle = query_manager->query_handle();
- const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+ const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
const tmb::client_id cli_id = query_handle->getClientId();
const std::size_t query_id = query_handle->query_id();
@@ -259,7 +269,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
}
- if (query_result == nullptr) {
+ if (query_result_relation == nullptr) {
if (query_processor_) {
query_processor_->saveCatalog();
}
@@ -272,17 +282,12 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kQueryTeardownMessage);
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
free(proto_bytes);
DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
<< "') to all Shiftbosses";
- QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
- shiftboss_addresses,
- move(message),
- bus_);
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
TaggedMessage cli_message(kQueryExecutionSuccessMessage);
@@ -299,12 +304,33 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
return;
}
+ const QueryHandle::AnalyzeQueryInfo *analyze_query_info = query_handle->analyze_query_info();
+ if (analyze_query_info) {
+ processAnalyzeQueryResult(cli_id, query_result_relation, analyze_query_info);
+
+ // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
+ S::QueryTeardownMessage proto;
+ proto.set_query_id(query_id);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+ << "') to all Shiftbosses";
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+ return;
+ }
+
// NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
S::SaveQueryResultMessage proto;
proto.set_query_id(query_id);
- proto.set_relation_id(query_result->getID());
+ proto.set_relation_id(query_result_relation->getID());
- const vector<block_id> blocks(query_result->getBlocksSnapshot());
+ const vector<block_id> blocks(query_result_relation->getBlocksSnapshot());
for (const block_id block : blocks) {
proto.add_blocks(block);
}
@@ -315,18 +341,111 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kSaveQueryResultMessage);
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kSaveQueryResultMessage);
free(proto_bytes);
// TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
<< "') to all Shiftbosses";
- QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
- shiftboss_addresses,
- move(message),
- bus_);
+ QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+}
+
+void PolicyEnforcerDistributed::processAnalyzeQueryResult(const tmb::client_id cli_id,
+ const CatalogRelation *query_result_relation,
+ const QueryHandle::AnalyzeQueryInfo *analyze_query_info) {
+ const relation_id rel_id = analyze_query_info->rel_id;
+ CatalogRelation *mutable_relation =
+ static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(rel_id);
+ CatalogRelationStatistics *mutable_stat = mutable_relation->getStatisticsMutable();
+
+ const auto analyze_query_result = [this, &query_result_relation]() {
+ const vector<block_id> blocks = query_result_relation->getBlocksSnapshot();
+ DCHECK_EQ(1u, blocks.size());
+
+ vector<TypedValue> query_result;
+ {
+ BlockReference block = storage_manager_->getBlock(blocks.front(), *query_result_relation);
+ const TupleStorageSubBlock &tuple_store = block->getTupleStorageSubBlock();
+ DCHECK_EQ(1, tuple_store.numTuples());
+
+ const std::size_t num_columns = tuple_store.getRelation().size();
+ if (tuple_store.isPacked()) {
+ for (std::size_t i = 0; i < num_columns; ++i) {
+ query_result.emplace_back(tuple_store.getAttributeValueTyped(0, i));
+ }
+ } else {
+ std::unique_ptr<TupleIdSequence> existence_map(tuple_store.getExistenceMap());
+ for (std::size_t i = 0; i < num_columns; ++i) {
+ query_result.emplace_back(
+ tuple_store.getAttributeValueTyped(*existence_map->begin(), i));
+ }
+ }
+ }
+
+ // Clean up the query result relation.
+ for (const block_id block : blocks) {
+ storage_manager_->deleteBlockOrBlobFile(block);
+ }
+ catalog_database_->dropRelationById(query_result_relation->getID());
+
+ return query_result;
+ }();
+
+ if (analyze_query_info->is_analyze_attribute_query) {
+ const attribute_id attr_id = analyze_query_info->attr_id;
+
+ auto cit = analyze_query_result.begin();
+ DCHECK_EQ(TypeID::kLong, cit->getTypeID());
+ mutable_stat->setNumDistinctValues(attr_id, cit->getLiteral<std::int64_t>());
+
+ if (analyze_query_info->is_min_applicable) {
+ ++cit;
+ mutable_stat->setMinValue(attr_id, *cit);
+ }
+
+ if (analyze_query_info->is_max_applicable) {
+ ++cit;
+ mutable_stat->setMaxValue(attr_id, *cit);
+ }
+ } else {
+ completed_analyze_relations_[cli_id].push_back(rel_id);
+
+ DCHECK_EQ(1u, analyze_query_result.size());
+ const TypedValue &num_tuples = analyze_query_result.front();
+ DCHECK_EQ(TypeID::kLong, num_tuples.getTypeID());
+ mutable_stat->setNumTuples(num_tuples.getLiteral<std::int64_t>());
+
+ mutable_stat->setExactness(true);
+
+ if (completed_analyze_relations_[cli_id].size() == analyze_query_info->num_relations) {
+ query_processor_->markCatalogAltered();
+ query_processor_->saveCatalog();
+
+ ostringstream analyze_command_response;
+ for (const relation_id rel_id : completed_analyze_relations_[cli_id]) {
+ analyze_command_response << "Analyzing " << catalog_database_->getRelationSchemaById(rel_id).getName()
+ << " ... done\n";
+ }
+
+ S::CommandResponseMessage proto;
+ proto.set_command_response(analyze_command_response.str());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+ free(proto_bytes);
+
+ DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage (typed '" << kCommandResponseMessage
+ << "') to CLI with TMB client id " << cli_id;
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+ completed_analyze_relations_.erase(cli_id);
+ }
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 18fd9ae..5334da0 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -28,6 +28,7 @@
#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "query_optimizer/QueryHandle.hpp"
+#include "types/TypedValue.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -42,7 +43,9 @@ class TaggedMessage;
namespace quickstep {
class CatalogDatabaseLite;
+class CatalogRelation;
class QueryProcessor;
+class StorageManager;
/** \addtogroup QueryExecution
* @{
@@ -61,16 +64,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
* @param catalog_database The CatalogDatabase used.
* @param query_processor The QueryProcessor to save catalog upon the query
* completion.
+ * @param storage_manager The StorageManager to use.
+ * @param shiftboss_directory The ShiftbossDirectory to manage Shiftbosses.
* @param bus The TMB.
**/
PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
CatalogDatabaseLite *catalog_database,
QueryProcessor *query_processor,
+ StorageManager *storage_manager,
ShiftbossDirectory *shiftboss_directory,
tmb::MessageBus *bus)
: PolicyEnforcerBase(catalog_database),
foreman_client_id_(foreman_client_id),
query_processor_(query_processor),
+ storage_manager_(storage_manager),
shiftboss_directory_(shiftboss_directory),
bus_(bus) {}
@@ -157,13 +164,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void initiateQueryInShiftboss(QueryHandle *query_handle);
+ void processAnalyzeQueryResult(const tmb::client_id cli_id,
+ const CatalogRelation *query_result_relation,
+ const QueryHandle::AnalyzeQueryInfo *analyze_query_info);
+
const tmb::client_id foreman_client_id_;
QueryProcessor *query_processor_;
+ StorageManager *storage_manager_;
ShiftbossDirectory *shiftboss_directory_;
tmb::MessageBus *bus_;
+ std::unordered_map<tmb::client_id, std::vector<relation_id>> completed_analyze_relations_;
+
DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index a49de5e..081852f 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -90,7 +90,7 @@ enum QueryExecutionMessageType : message_type_id {
kDistributedCliRegistrationMessage, // From CLI to Conductor.
kDistributedCliRegistrationResponseMessage, // From Conductor to CLI.
- kSqlQueryMessage, // From CLI to Conductor.
+ kSqlQueryMessage, // From CLI to Conductor.
kQueryInitiateMessage, // From Foreman to Shiftboss.
kQueryInitiateResponseMessage, // From Shiftboss to Foreman.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 3ff783c..9bdb753 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -229,6 +229,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
quickstep_utility_Macros
quickstep_utility_PlanVisualizer)
target_link_libraries(quickstep_queryoptimizer_QueryHandle
+ quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
quickstep_queryexecution_QueryContext_proto
quickstep_queryoptimizer_QueryPlan
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 7cb4f68..6feeb4c 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -25,6 +25,7 @@
#include <memory>
#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "query_optimizer/QueryPlan.hpp"
@@ -45,19 +46,47 @@ class CatalogRelation;
**/
class QueryHandle {
public:
+ // The info for generated queries during executing '\analyze'.
+ struct AnalyzeQueryInfo {
+ AnalyzeQueryInfo(const bool is_analyze_attribute_query_in,
+ const relation_id rel_id_in,
+ const std::size_t num_relations_in,
+ const attribute_id attr_id_in = kInvalidCatalogId,
+ const bool is_min_applicable_in = false,
+ const bool is_max_applicable_in = false)
+ : is_analyze_attribute_query(is_analyze_attribute_query_in),
+ rel_id(rel_id_in),
+ num_relations(num_relations_in),
+ attr_id(attr_id_in),
+ is_min_applicable(is_min_applicable_in),
+ is_max_applicable(is_max_applicable_in) {}
+
+ const bool is_analyze_attribute_query;
+ const relation_id rel_id;
+ const std::size_t num_relations;
+
+ // Only valid if 'is_analyze_attribute_query' is true.
+ const attribute_id attr_id;
+ const bool is_min_applicable;
+ const bool is_max_applicable;
+ };
+
/**
* @brief Constructor.
*
* @param query_id The given query id.
* @param cli_id The client id of the CLI which submits the query.
* @param query_priority The priority of this query.
+ * @param analyze_query_info The info of this analyze query.
*/
- explicit QueryHandle(const std::size_t query_id,
- const tmb::client_id cli_id,
- const std::uint64_t query_priority = 1)
+ QueryHandle(const std::size_t query_id,
+ const tmb::client_id cli_id,
+ const std::uint64_t query_priority = 1,
+ AnalyzeQueryInfo *analyze_query_info = nullptr)
: query_id_(query_id),
cli_id_(cli_id),
query_priority_(query_priority),
+ analyze_query_info_(analyze_query_info),
query_plan_(new QueryPlan()),
query_result_relation_(nullptr) {}
@@ -87,6 +116,13 @@ class QueryHandle {
}
/**
+ * @brief Get the query info for the command '\analyze'.
+ **/
+ const AnalyzeQueryInfo* analyze_query_info() const {
+ return analyze_query_info_.get();
+ }
+
+ /**
* @return The const query plan.
*/
const QueryPlan& getQueryPlan() const {
@@ -165,6 +201,7 @@ class QueryHandle {
const tmb::client_id cli_id_;
const std::uint64_t query_priority_;
+ std::unique_ptr<AnalyzeQueryInfo> analyze_query_info_;
std::unique_ptr<QueryPlan> query_plan_;