You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/22 18:56:11 UTC

[02/22] incubator-quickstep git commit: Minor fixes for the distributed version.

Minor fixes for the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1325a6ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1325a6ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1325a6ae

Branch: refs/heads/LIP-for-tpch-merged
Commit: 1325a6ae2c909fbadb4b0661478f42a5e6687932
Parents: 6ee9842
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Sat Aug 13 23:22:41 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Sat Aug 13 23:22:41 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  | 16 +++++++-------
 query_execution/PolicyEnforcerDistributed.cpp   | 10 ++++-----
 query_execution/PolicyEnforcerDistributed.hpp   |  6 +++---
 query_execution/QueryExecutionTypedefs.hpp      |  4 ++--
 query_execution/Shiftboss.cpp                   | 20 +++++++++++++++---
 query_execution/Shiftboss.hpp                   | 22 +++++++++++++-------
 .../tests/execution_generator/CMakeLists.txt    |  2 +-
 7 files changed, 51 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 74fcafb..4033594 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -31,7 +31,7 @@ endif()
 add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
@@ -52,12 +52,12 @@ add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryEx
 add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp)
   add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
 add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
 add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -80,7 +80,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_threading_ThreadUtil
                         quickstep_utility_Macros
                         tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_ForemanBase
                       glog
                       quickstep_threading_Thread
@@ -223,7 +223,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_utility_DAG
                         quickstep_utility_Macros
                         tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryContext
@@ -262,7 +262,7 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
                         quickstep_utility_Macros
                         tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
                       glog
                       quickstep_relationaloperators_WorkOrder_proto
@@ -320,7 +320,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_Shiftboss
                         quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
 
 # Tests:
 if (ENABLE_DISTRIBUTED)
@@ -346,7 +346,7 @@ if (ENABLE_DISTRIBUTED)
                         tmb
                         ${LIBS})
   add_test(BlockLocator_unittest BlockLocator_unittest)
-endif()
+endif(ENABLE_DISTRIBUTED)
 
 add_executable(QueryManagerSingleNode_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index c76a9e1..47491ed 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -58,16 +58,16 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
               " can be allocated in a single round of dispatch of messages to"
               " the workers.");
 
-void PolicyEnforcerDistributed::getWorkOrderMessages(
-    vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
+    vector<unique_ptr<S::WorkOrderMessage>> *work_order_proto_messages) {
   // Iterate over admitted queries until either there are no more
   // messages available, or the maximum number of messages have
   // been collected.
-  DCHECK(work_order_messages->empty());
+  DCHECK(work_order_proto_messages->empty());
   // TODO(harshad) - Make this function generic enough so that it
   // works well when multiple queries are getting executed.
   if (admitted_queries_.empty()) {
-    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
     return;
   }
 
@@ -86,7 +86,7 @@ void PolicyEnforcerDistributed::getWorkOrderMessages(
           static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
       if (next_work_order_message != nullptr) {
         ++messages_collected_curr_query;
-        work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+        work_order_proto_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
       } else {
         // No more work ordes from the current query at this time.
         // Check if the query's execution is over.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 16ebe36..bce3e0c 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -76,10 +76,10 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    * @brief Get work order messages to be dispatched. These messages come from
    *        the active queries.
    *
-   * @param work_order_messages The work order messages to be dispatched.
+   * @param work_order_proto_messages The work order messages to be dispatched.
    **/
-  void getWorkOrderMessages(
-      std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+  void getWorkOrderProtoMessages(
+      std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_proto_messages);
 
   /**
    * @brief Process the initiate rebuild work order response message.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 33a93b0..bba67e3 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -63,8 +63,8 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
 
 // We sort the following message types in the order of a life cycle of a query.
 enum QueryExecutionMessageType : message_type_id {
-  kAdmitRequestMessage,  // Requesting a query (or queries) to be admitted, from
-                         // the main thread to Foreman.
+  kAdmitRequestMessage = 0,  // Requesting a query (or queries) to be admitted, from
+                             // the main thread to Foreman.
   kWorkOrderMessage,  // From Foreman to Worker.
   kWorkOrderCompleteMessage,  // From Worker to Foreman.
   kCatalogRelationNewBlockMessage,  // From InsertDestination to Foreman.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ddfd47f..5c2c5e0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -147,10 +147,11 @@ void Shiftboss::run() {
                                       proto.relation_id());
         break;
       }
-      case kWorkOrderCompleteMessage:  // Fall through.
-      case kRebuildWorkOrderCompleteMessage:
+      case kCatalogRelationNewBlockMessage:  // Fall through.
       case kDataPipelineMessage:
-      case kWorkOrderFeedbackMessage: {
+      case kWorkOrderFeedbackMessage:
+      case kWorkOrderCompleteMessage:
+      case kRebuildWorkOrderCompleteMessage: {
         DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
                    << "') forwarded typed '" << annotated_message.tagged_message.message_type()
                    << "' message from Worker with TMB client ID '" << annotated_message.sender
@@ -165,6 +166,15 @@ void Shiftboss::run() {
         CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
+      case kQueryTeardownMessage: {
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+        serialization::QueryTeardownMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        query_contexts_.erase(proto.query_id());
+        break;
+      }
       case kSaveQueryResultMessage: {
         const TaggedMessage &tagged_message = annotated_message.tagged_message;
 
@@ -175,8 +185,12 @@ void Shiftboss::run() {
           storage_manager_->saveBlockOrBlob(proto.blocks(i));
         }
 
+        // Clean up query execution states, i.e., QueryContext.
+        query_contexts_.erase(proto.query_id());
+
         serialization::SaveQueryResultResponseMessage proto_response;
         proto_response.set_relation_id(proto.relation_id());
+        proto_response.set_cli_id(proto.cli_id());
 
         const size_t proto_response_length = proto_response.ByteSize();
         char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 30a8d1a..94b10a2 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <unordered_map>
 
@@ -97,27 +98,34 @@ class Shiftboss : public Thread {
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
 
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+
     // Message sent to Worker.
     bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
 
-    // Message sent to Foreman.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
     // Forward the following message types from Foreman to Workers.
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
 
     // Forward the following message types from Workers to Foreman.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
 
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+    // Clean up query execution states, i.e., QueryContext.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
 
     // Stop itself.
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 1980980..0c00ff6 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -83,4 +83,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
-file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
\ No newline at end of file
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)