You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/08 08:54:00 UTC
[17/18] incubator-quickstep git commit: Removed the temp query result
relation in the distributed version.
Removed the temp query result relation in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/aa7f6fe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/aa7f6fe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/aa7f6fe4
Branch: refs/heads/tmb_poll_interval
Commit: aa7f6fe4e07804524aca0f1574935ae3f73c985d
Parents: dd8747f
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 00:33:31 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 00:35:53 2017 -0800
----------------------------------------------------------------------
cli/distributed/CMakeLists.txt | 1 +
cli/distributed/Cli.cpp | 20 +++++++++++++++++++-
cli/distributed/Conductor.cpp | 13 ++++++++++++-
cli/distributed/Conductor.hpp | 4 ++++
query_execution/QueryExecutionMessages.proto | 4 ++++
query_execution/QueryExecutionTypedefs.hpp | 2 ++
6 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index b46082f..5804321 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -25,6 +25,7 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
# Link dependencies:
target_link_libraries(quickstep_cli_distributed_Conductor
glog
+ quickstep_catalog_CatalogDatabase
quickstep_cli_DefaultsConfigurator
quickstep_cli_Flags
quickstep_cli_distributed_Role
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 5af70e6..386654d 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -122,7 +122,10 @@ void Cli::init() {
// Prepare for submitting a query.
bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
+
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+ bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage);
+
bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
}
@@ -191,7 +194,7 @@ void Cli::run() {
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
if (proto.has_result_relation()) {
- CatalogRelation result_relation(proto.result_relation());
+ const CatalogRelation result_relation(proto.result_relation());
PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout);
@@ -199,6 +202,21 @@ void Cli::run() {
for (const block_id block : blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
+
+ // Notify Conductor to remove the temp query result relation in the Catalog.
+ S::QueryResultTeardownMessage proto_response;
+ proto_response.set_relation_id(result_relation.getID());
+
+ const size_t proto_response_length = proto_response.ByteSize();
+ char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
+ CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
+
+ TaggedMessage response_message(static_cast<const void*>(proto_response_bytes),
+ proto_response_length,
+ kQueryResultTeardownMessage);
+ free(proto_response_bytes);
+
+ QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(response_message));
}
std::chrono::duration<double, std::milli> time_in_ms = end - start;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 13d4d57..cf2eb4b 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -28,6 +28,7 @@
#include <string>
#include <utility>
+#include "catalog/CatalogDatabase.hpp"
#include "cli/DefaultsConfigurator.hpp"
#include "cli/Flags.hpp"
#include "parser/ParseStatement.hpp"
@@ -73,6 +74,7 @@ void Conductor::init() {
}
query_processor_ = make_unique<QueryProcessor>(move(catalog_path));
+ catalog_database_ = query_processor_->getDefaultDatabase();
} catch (const std::exception &e) {
LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what()
<< "\nIf you intended to create a new database, "
@@ -93,12 +95,14 @@ void Conductor::init() {
bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
+ bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage);
+
block_locator_ = make_unique<BlockLocator>(&bus_);
block_locator_->start();
foreman_ = make_unique<ForemanDistributed>(*block_locator_,
std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_,
- query_processor_->getDefaultDatabase());
+ catalog_database_);
foreman_->start();
}
@@ -129,6 +133,13 @@ void Conductor::run() {
processSqlQueryMessage(sender, new string(move(proto.sql_query())));
break;
}
+ case kQueryResultTeardownMessage: {
+ S::QueryResultTeardownMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ catalog_database_->dropRelationById(proto.relation_id());
+ break;
+ }
default:
LOG(FATAL) << "Unknown TMB message type";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index e8c9582..09bf2b9 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -34,6 +34,8 @@
namespace quickstep {
+class CatalogDatabase;
+
/** \addtogroup CliDistributed
* @{
*/
@@ -63,6 +65,8 @@ class Conductor final : public Role {
SqlParserWrapper parser_wrapper_;
std::unique_ptr<QueryProcessor> query_processor_;
+ // Not owned.
+ CatalogDatabase *catalog_database_;
tmb::client_id conductor_client_id_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 115a9a3..68f286d 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -138,6 +138,10 @@ message QueryExecutionSuccessMessage {
optional CatalogRelationSchema result_relation = 1;
}
+message QueryResultTeardownMessage {
+ required int32 relation_id = 1;
+}
+
message QueryExecutionErrorMessage {
required string error_message = 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9f78302..994bd60 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -105,6 +105,8 @@ enum QueryExecutionMessageType : message_type_id {
kQueryExecutionSuccessMessage,
kQueryExecutionErrorMessage,
+ kQueryResultTeardownMessage, // From CLI to Conductor.
+
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.