You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/10 14:45:38 UTC

[17/31] incubator-quickstep git commit: Removed the temp query result relation in the distributed version.

Removed the temp query result relation in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/aa7f6fe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/aa7f6fe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/aa7f6fe4

Branch: refs/heads/reorder-partitioned-hash-join
Commit: aa7f6fe4e07804524aca0f1574935ae3f73c985d
Parents: dd8747f
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 00:33:31 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 00:35:53 2017 -0800

----------------------------------------------------------------------
 cli/distributed/CMakeLists.txt               |  1 +
 cli/distributed/Cli.cpp                      | 20 +++++++++++++++++++-
 cli/distributed/Conductor.cpp                | 13 ++++++++++++-
 cli/distributed/Conductor.hpp                |  4 ++++
 query_execution/QueryExecutionMessages.proto |  4 ++++
 query_execution/QueryExecutionTypedefs.hpp   |  2 ++
 6 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index b46082f..5804321 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -25,6 +25,7 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
 # Link dependencies:
 target_link_libraries(quickstep_cli_distributed_Conductor
                       glog
+                      quickstep_catalog_CatalogDatabase
                       quickstep_cli_DefaultsConfigurator
                       quickstep_cli_Flags
                       quickstep_cli_distributed_Role

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 5af70e6..386654d 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -122,7 +122,10 @@ void Cli::init() {
 
   // Prepare for submitting a query.
   bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
+
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+  bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage);
+
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
 }
 
@@ -191,7 +194,7 @@ void Cli::run() {
             CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
             if (proto.has_result_relation()) {
-              CatalogRelation result_relation(proto.result_relation());
+              const CatalogRelation result_relation(proto.result_relation());
 
               PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout);
 
@@ -199,6 +202,21 @@ void Cli::run() {
               for (const block_id block : blocks) {
                 storage_manager_->deleteBlockOrBlobFile(block);
               }
+
+              // Notify Conductor to remove the temp query result relation in the Catalog.
+              S::QueryResultTeardownMessage proto_response;
+              proto_response.set_relation_id(result_relation.getID());
+
+              const size_t proto_response_length = proto_response.ByteSize();
+              char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
+              CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
+
+              TaggedMessage response_message(static_cast<const void*>(proto_response_bytes),
+                                             proto_response_length,
+                                             kQueryResultTeardownMessage);
+              free(proto_response_bytes);
+
+              QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(response_message));
             }
 
             std::chrono::duration<double, std::milli> time_in_ms = end - start;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 13d4d57..cf2eb4b 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -28,6 +28,7 @@
 #include <string>
 #include <utility>
 
+#include "catalog/CatalogDatabase.hpp"
 #include "cli/DefaultsConfigurator.hpp"
 #include "cli/Flags.hpp"
 #include "parser/ParseStatement.hpp"
@@ -73,6 +74,7 @@ void Conductor::init() {
     }
 
     query_processor_ = make_unique<QueryProcessor>(move(catalog_path));
+    catalog_database_ = query_processor_->getDefaultDatabase();
   } catch (const std::exception &e) {
     LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what()
                << "\nIf you intended to create a new database, "
@@ -93,12 +95,14 @@ void Conductor::init() {
   bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
 
+  bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage);
+
   block_locator_ = make_unique<BlockLocator>(&bus_);
   block_locator_->start();
 
   foreman_ = make_unique<ForemanDistributed>(*block_locator_,
                                              std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_,
-                                             query_processor_->getDefaultDatabase());
+                                             catalog_database_);
   foreman_->start();
 }
 
@@ -129,6 +133,13 @@ void Conductor::run() {
         processSqlQueryMessage(sender, new string(move(proto.sql_query())));
         break;
       }
+      case kQueryResultTeardownMessage: {
+        S::QueryResultTeardownMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        catalog_database_->dropRelationById(proto.relation_id());
+        break;
+      }
       default:
         LOG(FATAL) << "Unknown TMB message type";
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index e8c9582..09bf2b9 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -34,6 +34,8 @@
 
 namespace quickstep {
 
+class CatalogDatabase;
+
 /** \addtogroup CliDistributed
  *  @{
  */
@@ -63,6 +65,8 @@ class Conductor final : public Role {
   SqlParserWrapper parser_wrapper_;
 
   std::unique_ptr<QueryProcessor> query_processor_;
+  // Not owned.
+  CatalogDatabase *catalog_database_;
 
   tmb::client_id conductor_client_id_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 115a9a3..68f286d 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -138,6 +138,10 @@ message QueryExecutionSuccessMessage {
   optional CatalogRelationSchema result_relation = 1;
 }
 
+message QueryResultTeardownMessage {
+  required int32 relation_id = 1;
+}
+
 message QueryExecutionErrorMessage {
   required string error_message = 1;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9f78302..994bd60 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -105,6 +105,8 @@ enum QueryExecutionMessageType : message_type_id {
   kQueryExecutionSuccessMessage,
   kQueryExecutionErrorMessage,
 
+  kQueryResultTeardownMessage,  // From CLI to Conductor.
+
   // BlockLocator related messages, sorted in a life cycle of StorageManager
   // with a unique block domain.
   kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.