You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/04 19:03:55 UTC

[2/4] incubator-quickstep git commit: Clean up query execution states in Shiftboss.

Clean up query execution states in Shiftboss.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9e158490
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9e158490
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9e158490

Branch: refs/heads/new-distributed-exe-test
Commit: 9e1584901ee3cd3658c426d8c470ed5aabe046d5
Parents: 220fa06
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Sat Jul 30 10:59:31 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Sat Jul 30 10:59:31 2016 -0700

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        |  1 +
 query_execution/PolicyEnforcerDistributed.cpp | 42 +++++++++++++++++-----
 query_execution/QueryExecutionMessages.proto  |  4 +++
 query_execution/QueryExecutionTypedefs.hpp    |  2 ++
 query_execution/Shiftboss.cpp                 |  9 +++++
 query_execution/Shiftboss.hpp                 |  3 ++
 6 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 1c0fba8..2632113 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -67,6 +67,7 @@ ForemanDistributed::ForemanDistributed(
       kQueryInitiateMessage,
       kWorkOrderMessage,
       kInitiateRebuildMessage,
+      kQueryTeardownMessage,
       kSaveQueryResultMessage,
       kQueryExecutionSuccessMessage,
       kPoisonMessage};

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 59df3de..087e0b3 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -196,29 +196,53 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
 
 void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) {
   const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+  const tmb::client_id cli_id = query_handle->getClientId();
+  const std::size_t query_id = query_handle->query_id();
+
   if (query_result == nullptr) {
-    // TODO(zuyu): notify Shiftboss to remove QueryContext.
-    TaggedMessage message(kQueryExecutionSuccessMessage);
+    // Clean up query execution states, i.e., QueryContext, in Shiftboss.
+    serialization::QueryTeardownMessage proto;
+    proto.set_query_id(query_id);
+
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kQueryTeardownMessage);
+
+    // TODO(zuyu): Support multiple shiftbosses.
+    LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+              << "') to Shiftboss 0";
+    tmb::MessageBus::SendStatus send_status =
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           foreman_client_id_,
+                                           shiftboss_directory_->getClientId(0),
+                                           move(message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+        << " to Shiftboss";
 
-    const tmb::client_id cli_id = query_handle->getClientId();
+    TaggedMessage cli_message(kQueryExecutionSuccessMessage);
 
-    // Notify the CLI regarding the query execution result.
+    // Notify the CLI query execution successfully.
     LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
               << "') to CLI with TMB client id " << cli_id;
-    const tmb::MessageBus::SendStatus send_status =
+    send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            foreman_client_id_,
                                            cli_id,
-                                           move(message));
+                                           move(cli_message));
     CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
         << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
         << " to CLI with TMB client ID " << cli_id;
     return;
   }
 
-  // SaveQueryResultMessage implies QueryContext clean up in Shiftboss.
+  // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
   S::SaveQueryResultMessage proto;
-  proto.set_query_id(query_handle->query_id());
+  proto.set_query_id(query_id);
   proto.set_relation_id(query_result->getID());
 
   const vector<block_id> blocks(query_result->getBlocksSnapshot());
@@ -226,7 +250,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) {
     proto.add_blocks(block);
   }
 
-  proto.set_cli_id(query_handle->getClientId());
+  proto.set_cli_id(cli_id);
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 99de75c..b182b87 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -114,6 +114,10 @@ message InitiateRebuildResponseMessage {
   required uint64 shiftboss_index = 4;
 }
 
+message QueryTeardownMessage {
+  required uint64 query_id = 1;
+}
+
 message SaveQueryResultMessage {
   required uint64 query_id = 1;
   required int32 relation_id = 2;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 0d43237..5706c82 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,6 +81,8 @@ enum QueryExecutionMessageType : message_type_id {
   kInitiateRebuildMessage,  // From Foreman to Shiftboss.
   kInitiateRebuildResponseMessage,  // From Shiftboss to Foreman.
 
+  kQueryTeardownMessage,  // From Foreman to Shiftboss.
+
   kSaveQueryResultMessage,  // From Foreman to Shiftboss.
   kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 925dc1f..978fe34 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -157,6 +157,15 @@ void Shiftboss::run() {
             << " to Foreman with TMB client ID " << foreman_client_id_;
         break;
       }
+      case kQueryTeardownMessage: {
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+        serialization::QueryTeardownMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        query_contexts_.erase(proto.query_id());
+        break;
+      }
       case kSaveQueryResultMessage: {
         const TaggedMessage &tagged_message = annotated_message.tagged_message;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 9e24d62..3943938 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -115,6 +115,9 @@ class Shiftboss : public Thread {
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
 
+    // Clean up query execution states, i.e., QueryContext.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
+
     // Stop itself.
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
     // Stop all workers.