You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/06 20:18:25 UTC

[28/38] incubator-quickstep git commit: Removed the unnecessary messages for saving query results in the distributed version.

Removed the unnecessary messages for saving query results in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8643add3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8643add3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8643add3

Branch: refs/heads/reorder-partitioned-hash-join
Commit: 8643add3fc22a85e3db02b80a4d290bd9b5b323e
Parents: 2e7f446
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Mar 4 23:56:48 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sat Mar 4 23:56:48 2017 -0800

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        | 43 ----------
 query_execution/ForemanDistributed.hpp        |  6 --
 query_execution/PolicyEnforcerDistributed.cpp | 95 ++++++++--------------
 query_execution/QueryExecutionMessages.proto  | 15 ----
 query_execution/QueryExecutionTypedefs.hpp    |  3 -
 query_execution/Shiftboss.cpp                 | 42 ----------
 6 files changed, 35 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 3d47fb6..b59edb5 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -86,7 +86,6 @@ ForemanDistributed::ForemanDistributed(
       kWorkOrderMessage,
       kInitiateRebuildMessage,
       kQueryTeardownMessage,
-      kSaveQueryResultMessage,
       kQueryExecutionSuccessMessage,
       kCommandResponseMessage,
       kPoisonMessage};
@@ -106,7 +105,6 @@ ForemanDistributed::ForemanDistributed(
       kWorkOrderCompleteMessage,
       kRebuildWorkOrderCompleteMessage,
       kWorkOrderFeedbackMessage,
-      kSaveQueryResultResponseMessage,
       kPoisonMessage};
 
   for (const auto message_type : receiver_message_types) {
@@ -203,20 +201,6 @@ void ForemanDistributed::run() {
             processInitiateRebuildResponseMessage(tagged_message);
         break;
       }
-      case kSaveQueryResultResponseMessage: {
-        S::SaveQueryResultResponseMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const std::size_t query_id = proto.query_id();
-        query_result_saved_shiftbosses_[query_id].insert(proto.shiftboss_index());
-
-        // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
-        if (query_result_saved_shiftbosses_[query_id].size() == shiftboss_directory_.size()) {
-          processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
-          query_result_saved_shiftbosses_.erase(query_id);
-        }
-        break;
-      }
       case kPoisonMessage: {
         if (policy_enforcer_->hasQueries()) {
           LOG(WARNING) << "ForemanDistributed thread exiting while some queries are "
@@ -466,31 +450,4 @@ void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shi
   CHECK(send_status == MessageBus::SendStatus::kOK);
 }
 
-void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id,
-                                                               const relation_id result_relation_id) {
-  S::QueryExecutionSuccessMessage proto;
-  proto.mutable_result_relation()->MergeFrom(
-      static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto());
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kQueryExecutionSuccessMessage);
-  free(proto_bytes);
-
-  // Notify the CLI regarding the query result.
-  DLOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '"
-             << kQueryExecutionSuccessMessage
-             << "') to CLI with TMB client id " << cli_id;
-  const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         cli_id,
-                                         move(message));
-  CHECK(send_status == MessageBus::SendStatus::kOK);
-}
-
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 4053b9d..2ce9c5d 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -115,9 +115,6 @@ class ForemanDistributed final : public ForemanBase {
   void processShiftbossRegistrationMessage(const tmb::client_id shiftboss_client_id,
                                            const std::size_t work_order_capacity);
 
-  void processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
-                                             const relation_id result_relation_id);
-
   /**
    * @brief Check if we can collect new messages from the PolicyEnforcer.
    *
@@ -136,9 +133,6 @@ class ForemanDistributed final : public ForemanBase {
   DataExchangerAsync data_exchanger_;
   std::unique_ptr<StorageManager> storage_manager_;
 
-  // From a query id to a set of Shiftbosses that save query result.
-  std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
-
   DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 424452e..e9faf8c 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -64,6 +64,7 @@ using std::string;
 using std::unique_ptr;
 using std::vector;
 
+using tmb::MessageBus;
 using tmb::TaggedMessage;
 
 namespace quickstep {
@@ -269,83 +270,57 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
     shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
   }
 
-  if (query_result_relation == nullptr) {
-    if (query_processor_) {
-      query_processor_->saveCatalog();
-    }
-
-    // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
-    serialization::QueryTeardownMessage proto;
-    proto.set_query_id(query_id);
 
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+  if (query_result_relation) {
+    const QueryHandle::AnalyzeQueryInfo *analyze_query_info = query_handle->analyze_query_info();
+    if (analyze_query_info) {
+      processAnalyzeQueryResult(cli_id, query_result_relation, analyze_query_info);
+    } else {
+      S::QueryExecutionSuccessMessage proto;
+      proto.mutable_result_relation()->MergeFrom(query_result_relation->getProto());
 
-    TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
-    free(proto_bytes);
+      const size_t proto_length = proto.ByteSize();
+      char *proto_bytes = static_cast<char*>(malloc(proto_length));
+      CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
-               << "') to all Shiftbosses";
-    QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+      TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryExecutionSuccessMessage);
+      free(proto_bytes);
 
-    TaggedMessage cli_message(kQueryExecutionSuccessMessage);
+      // Notify the CLI regarding the query result.
+      DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
+                 << kQueryExecutionSuccessMessage
+                 << "') to CLI with TMB client id " << cli_id;
+      const MessageBus::SendStatus send_status =
+          QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
+      CHECK(send_status == MessageBus::SendStatus::kOK);
+    }
+  } else {
+    if (query_processor_) {
+      query_processor_->saveCatalog();
+    }
 
     // Notify the CLI query execution successfully.
     DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
                << kQueryExecutionSuccessMessage
                << "') to CLI with TMB client id " << cli_id;
-    const tmb::MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           foreman_client_id_,
-                                           cli_id,
-                                           move(cli_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-    return;
+    const MessageBus::SendStatus send_status =
+        QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id,
+                                           TaggedMessage(kQueryExecutionSuccessMessage));
+    CHECK(send_status == MessageBus::SendStatus::kOK);
   }
 
-  const QueryHandle::AnalyzeQueryInfo *analyze_query_info = query_handle->analyze_query_info();
-  if (analyze_query_info) {
-    processAnalyzeQueryResult(cli_id, query_result_relation, analyze_query_info);
-
-    // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
-    S::QueryTeardownMessage proto;
-    proto.set_query_id(query_id);
-
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
-    free(proto_bytes);
-
-    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
-               << "') to all Shiftbosses";
-    QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
-    return;
-  }
-
-  // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
-  S::SaveQueryResultMessage proto;
+  // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
+  S::QueryTeardownMessage proto;
   proto.set_query_id(query_id);
-  proto.set_relation_id(query_result_relation->getID());
-
-  const vector<block_id> blocks(query_result_relation->getBlocksSnapshot());
-  for (const block_id block : blocks) {
-    proto.add_blocks(block);
-  }
-
-  proto.set_cli_id(cli_id);
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kSaveQueryResultMessage);
+  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
   free(proto_bytes);
 
-  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
-  DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+  DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
              << "') to all Shiftbosses";
   QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
 }
@@ -439,9 +414,9 @@ void PolicyEnforcerDistributed::processAnalyzeQueryResult(const tmb::client_id c
 
       DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage (typed '" << kCommandResponseMessage
                  << "') to CLI with TMB client id " << cli_id;
-      const tmb::MessageBus::SendStatus send_status =
+      const MessageBus::SendStatus send_status =
           QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
-      CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+      CHECK(send_status == MessageBus::SendStatus::kOK);
 
       completed_analyze_relations_.erase(cli_id);
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index a45e8df..dd3c9a7 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -119,21 +119,6 @@ message QueryTeardownMessage {
   required uint64 query_id = 1;
 }
 
-message SaveQueryResultMessage {
-  required uint64 query_id = 1;
-  required int32 relation_id = 2;
-  repeated fixed64 blocks = 3 [packed=true];
-
-  required uint32 cli_id = 4;  // tmb::client_id.
-}
-
-message SaveQueryResultResponseMessage {
-  required uint64 query_id = 1;
-  required int32 relation_id = 2;
-  required uint32 cli_id = 3;  // tmb::client_id.
-  required uint64 shiftboss_index = 4;
-}
-
 message CommandResponseMessage {
   required string command_response = 1;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 081852f..afdac92 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -100,9 +100,6 @@ enum QueryExecutionMessageType : message_type_id {
 
   kQueryTeardownMessage,  // From Foreman to Shiftboss.
 
-  kSaveQueryResultMessage,  // From Foreman to Shiftboss.
-  kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
-
   kQueryExecutionSuccessMessage,  // From Foreman to CLI.
 
   // From Foreman / Conductor to CLI.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8643add3/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 2f7dc3c..fa922f0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -107,9 +107,6 @@ Shiftboss::Shiftboss(tmb::MessageBus *bus_global,
   bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kInitiateRebuildMessage);
   bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kInitiateRebuildResponseMessage);
 
-  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kSaveQueryResultMessage);
-  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kSaveQueryResultResponseMessage);
-
   // Message sent to Worker.
   bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kShiftbossRegistrationResponseMessage);
   bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kRebuildWorkOrderMessage);
@@ -230,45 +227,6 @@ void Shiftboss::run() {
           query_contexts_.erase(proto.query_id());
           break;
         }
-        case kSaveQueryResultMessage: {
-          const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-          serialization::SaveQueryResultMessage proto;
-          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-          for (int i = 0; i < proto.blocks_size(); ++i) {
-            storage_manager_->saveBlockOrBlob(proto.blocks(i));
-          }
-
-          // Clean up query execution states, i.e., QueryContext.
-          query_contexts_.erase(proto.query_id());
-
-          serialization::SaveQueryResultResponseMessage proto_response;
-          proto_response.set_query_id(proto.query_id());
-          proto_response.set_relation_id(proto.relation_id());
-          proto_response.set_cli_id(proto.cli_id());
-          proto_response.set_shiftboss_index(shiftboss_index_);
-
-          const size_t proto_response_length = proto_response.ByteSize();
-          char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
-          CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
-
-          TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
-                                         proto_response_length,
-                                         kSaveQueryResultResponseMessage);
-          free(proto_response_bytes);
-
-          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
-                     << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
-                     << "') to Foreman with TMB client ID " << foreman_client_id_;
-          const MessageBus::SendStatus send_status =
-              QueryExecutionUtil::SendTMBMessage(bus_global_,
-                                                 shiftboss_client_id_global_,
-                                                 foreman_client_id_,
-                                                 move(message_response));
-          CHECK(send_status == MessageBus::SendStatus::kOK);
-          break;
-        }
         case kPoisonMessage: {
           DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
                      << "') forwarded PoisonMessage (typed '" << kPoisonMessage