You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/14 06:23:04 UTC
incubator-quickstep git commit: Minor fixes for the distributed
version.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 6ee9842fb -> 1325a6ae2
Minor fixes for the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1325a6ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1325a6ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1325a6ae
Branch: refs/heads/master
Commit: 1325a6ae2c909fbadb4b0661478f42a5e6687932
Parents: 6ee9842
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Sat Aug 13 23:22:41 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Sat Aug 13 23:22:41 2016 -0700
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 16 +++++++-------
query_execution/PolicyEnforcerDistributed.cpp | 10 ++++-----
query_execution/PolicyEnforcerDistributed.hpp | 6 +++---
query_execution/QueryExecutionTypedefs.hpp | 4 ++--
query_execution/Shiftboss.cpp | 20 +++++++++++++++---
query_execution/Shiftboss.hpp | 22 +++++++++++++-------
.../tests/execution_generator/CMakeLists.txt | 2 +-
7 files changed, 51 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 74fcafb..4033594 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -31,7 +31,7 @@ endif()
add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
@@ -52,12 +52,12 @@ add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryEx
add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp)
add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -80,7 +80,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_threading_ThreadUtil
quickstep_utility_Macros
tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ForemanBase
glog
quickstep_threading_Thread
@@ -223,7 +223,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_utility_DAG
quickstep_utility_Macros
tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryContext
@@ -262,7 +262,7 @@ if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
quickstep_utility_Macros
tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
glog
quickstep_relationaloperators_WorkOrder_proto
@@ -320,7 +320,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryManagerDistributed
quickstep_queryexecution_Shiftboss
quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
# Tests:
if (ENABLE_DISTRIBUTED)
@@ -346,7 +346,7 @@ if (ENABLE_DISTRIBUTED)
tmb
${LIBS})
add_test(BlockLocator_unittest BlockLocator_unittest)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_executable(QueryManagerSingleNode_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index c76a9e1..47491ed 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -58,16 +58,16 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" can be allocated in a single round of dispatch of messages to"
" the workers.");
-void PolicyEnforcerDistributed::getWorkOrderMessages(
- vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
+ vector<unique_ptr<S::WorkOrderMessage>> *work_order_proto_messages) {
// Iterate over admitted queries until either there are no more
// messages available, or the maximum number of messages have
// been collected.
- DCHECK(work_order_messages->empty());
+ DCHECK(work_order_proto_messages->empty());
// TODO(harshad) - Make this function generic enough so that it
// works well when multiple queries are getting executed.
if (admitted_queries_.empty()) {
- LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
return;
}
@@ -86,7 +86,7 @@ void PolicyEnforcerDistributed::getWorkOrderMessages(
static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
if (next_work_order_message != nullptr) {
++messages_collected_curr_query;
- work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+ work_order_proto_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
} else {
// No more work ordes from the current query at this time.
// Check if the query's execution is over.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 16ebe36..bce3e0c 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -76,10 +76,10 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
* @brief Get work order messages to be dispatched. These messages come from
* the active queries.
*
- * @param work_order_messages The work order messages to be dispatched.
+ * @param work_order_proto_messages The work order messages to be dispatched.
**/
- void getWorkOrderMessages(
- std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+ void getWorkOrderProtoMessages(
+ std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_proto_messages);
/**
* @brief Process the initiate rebuild work order response message.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 33a93b0..bba67e3 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -63,8 +63,8 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
// We sort the following message types in the order of a life cycle of a query.
enum QueryExecutionMessageType : message_type_id {
- kAdmitRequestMessage, // Requesting a query (or queries) to be admitted, from
- // the main thread to Foreman.
+ kAdmitRequestMessage = 0, // Requesting a query (or queries) to be admitted, from
+ // the main thread to Foreman.
kWorkOrderMessage, // From Foreman to Worker.
kWorkOrderCompleteMessage, // From Worker to Foreman.
kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ddfd47f..5c2c5e0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -147,10 +147,11 @@ void Shiftboss::run() {
proto.relation_id());
break;
}
- case kWorkOrderCompleteMessage: // Fall through.
- case kRebuildWorkOrderCompleteMessage:
+ case kCatalogRelationNewBlockMessage: // Fall through.
case kDataPipelineMessage:
- case kWorkOrderFeedbackMessage: {
+ case kWorkOrderFeedbackMessage:
+ case kWorkOrderCompleteMessage:
+ case kRebuildWorkOrderCompleteMessage: {
DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') forwarded typed '" << annotated_message.tagged_message.message_type()
<< "' message from Worker with TMB client ID '" << annotated_message.sender
@@ -165,6 +166,15 @@ void Shiftboss::run() {
CHECK(send_status == MessageBus::SendStatus::kOK);
break;
}
+ case kQueryTeardownMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::QueryTeardownMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ query_contexts_.erase(proto.query_id());
+ break;
+ }
case kSaveQueryResultMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
@@ -175,8 +185,12 @@ void Shiftboss::run() {
storage_manager_->saveBlockOrBlob(proto.blocks(i));
}
+ // Clean up query execution states, i.e., QueryContext.
+ query_contexts_.erase(proto.query_id());
+
serialization::SaveQueryResultResponseMessage proto_response;
proto_response.set_relation_id(proto.relation_id());
+ proto_response.set_cli_id(proto.cli_id());
const size_t proto_response_length = proto_response.ByteSize();
char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 30a8d1a..94b10a2 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -21,6 +21,7 @@
#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
#include <cstddef>
+#include <cstdint>
#include <memory>
#include <unordered_map>
@@ -97,27 +98,34 @@ class Shiftboss : public Thread {
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+
// Message sent to Worker.
bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
- // Message sent to Foreman.
- bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
// Forward the following message types from Foreman to Workers.
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
// Forward the following message types from Workers to Foreman.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+ // Clean up query execution states, i.e., QueryContext.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
// Stop itself.
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 1980980..0c00ff6 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -83,4 +83,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
-file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
\ No newline at end of file
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)