You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/12 16:02:55 UTC
[1/2] incubator-quickstep git commit: Fixed a bug in deserializing
WindowAggrWorkOrder. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/dist-q-opt-exe-gen-test 5f450d7c0 -> bf25af99b (forced update)
Fixed a bug in deserializing WindowAggrWorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6ee9842f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6ee9842f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6ee9842f
Branch: refs/heads/dist-q-opt-exe-gen-test
Commit: 6ee9842fbebd8dac38c35c383bad018b1b7fcb09
Parents: d9135a8
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Aug 12 08:59:12 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Fri Aug 12 08:59:12 2016 -0700
----------------------------------------------------------------------
relational_operators/WorkOrderFactory.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6ee9842f/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 7d7af59..6970486 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -434,7 +434,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)),
move(blocks),
query_context->getInsertDestination(
- proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
+ proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index)));
}
default:
LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto";
[2/2] incubator-quickstep git commit: Added ForemanDistributed and
unit tests.
Posted by zu...@apache.org.
Added ForemanDistributed and unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bf25af99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bf25af99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bf25af99
Branch: refs/heads/dist-q-opt-exe-gen-test
Commit: bf25af99bdd5ed7ea7645180f4468f01603387b0
Parents: 6ee9842
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 11:57:35 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Fri Aug 12 08:59:59 2016 -0700
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 40 ++-
query_execution/ForemanDistributed.cpp | 334 +++++++++++++++++++
query_execution/ForemanDistributed.hpp | 130 ++++++++
query_execution/PolicyEnforcerDistributed.cpp | 4 +-
query_execution/PolicyEnforcerDistributed.hpp | 2 +-
query_execution/QueryExecutionTypedefs.hpp | 2 +-
query_execution/Shiftboss.cpp | 20 +-
query_execution/Shiftboss.hpp | 22 +-
query_optimizer/tests/CMakeLists.txt | 40 +++
.../tests/DistributedExecutionGeneratorTest.cpp | 57 ++++
.../DistributedExecutionGeneratorTestRunner.cpp | 121 +++++++
.../DistributedExecutionGeneratorTestRunner.hpp | 143 ++++++++
.../tests/execution_generator/CMakeLists.txt | 72 +++-
13 files changed, 964 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 74fcafb..1b27194 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -31,8 +31,11 @@ endif()
add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
if (ENABLE_DISTRIBUTED)
@@ -52,12 +55,12 @@ add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryEx
add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp)
add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -80,12 +83,32 @@ if (ENABLE_DISTRIBUTED)
quickstep_threading_ThreadUtil
quickstep_utility_Macros
tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ForemanBase
glog
quickstep_threading_Thread
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_ForemanDistributed
+ glog
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_Catalog_proto
+ quickstep_queryexecution_AdmitRequestMessage
+ quickstep_queryexecution_ForemanBase
+ quickstep_queryexecution_PolicyEnforcerDistributed
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_ShiftbossDirectory
+ quickstep_threading_ThreadUtil
+ quickstep_utility_EqualsAnyConstant
+ quickstep_utility_Macros
+ tmb
+ ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ForemanSingleNode
glog
quickstep_queryexecution_AdmitRequestMessage
@@ -223,7 +246,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_utility_DAG
quickstep_utility_Macros
tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryContext
@@ -262,7 +285,7 @@ if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
quickstep_utility_Macros
tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
glog
quickstep_relationaloperators_WorkOrder_proto
@@ -316,11 +339,12 @@ target_link_libraries(quickstep_queryexecution
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_ForemanDistributed
quickstep_queryexecution_PolicyEnforcerDistributed
quickstep_queryexecution_QueryManagerDistributed
quickstep_queryexecution_Shiftboss
quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
# Tests:
if (ENABLE_DISTRIBUTED)
@@ -346,7 +370,7 @@ if (ENABLE_DISTRIBUTED)
tmb
${LIBS})
add_test(BlockLocator_unittest BlockLocator_unittest)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_executable(QueryManagerSingleNode_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
new file mode 100644
index 0000000..3a20459
--- /dev/null
+++ b/query_execution/ForemanDistributed.cpp
@@ -0,0 +1,334 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/ForemanDistributed.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+class QueryHandle;
+
+ForemanDistributed::ForemanDistributed(
+ MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ const int cpu_id,
+ const bool profile_individual_workorders)
+ : ForemanBase(bus, cpu_id),
+ catalog_database_(DCHECK_NOTNULL(catalog_database)) {
+ const std::vector<QueryExecutionMessageType> sender_message_types{
+ kShiftbossRegistrationResponseMessage,
+ kQueryInitiateMessage,
+ kWorkOrderMessage,
+ kInitiateRebuildMessage,
+ kQueryTeardownMessage,
+ kSaveQueryResultMessage,
+ kQueryExecutionSuccessMessage,
+ kPoisonMessage};
+
+ for (const auto message_type : sender_message_types) {
+ bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+ }
+
+ const std::vector<QueryExecutionMessageType> receiver_message_types{
+ kShiftbossRegistrationMessage,
+ kAdmitRequestMessage,
+ kQueryInitiateResponseMessage,
+ kCatalogRelationNewBlockMessage,
+ kDataPipelineMessage,
+ kInitiateRebuildResponseMessage,
+ kWorkOrderCompleteMessage,
+ kRebuildWorkOrderCompleteMessage,
+ kWorkOrderFeedbackMessage,
+ kSaveQueryResultResponseMessage,
+ kPoisonMessage};
+
+ for (const auto message_type : receiver_message_types) {
+ bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+ }
+
+ policy_enforcer_.reset(new PolicyEnforcerDistributed(
+ foreman_client_id_,
+ catalog_database_,
+ &shiftboss_directory_,
+ bus_,
+ profile_individual_workorders));
+}
+
+void ForemanDistributed::run() {
+ if (cpu_id_ >= 0) {
+ // We can pin the foreman thread to a CPU if specified.
+ ThreadUtil::BindToCPU(cpu_id_);
+ }
+
+ // Ensure that at least one Shiftboss to register.
+ if (shiftboss_directory_.empty()) {
+ const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
+ LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
+
+ S::ShiftbossRegistrationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
+ DCHECK_EQ(1u, shiftboss_directory_.size());
+ }
+
+ // Event loop
+ for (;;) {
+ // Receive() causes this thread to sleep until next message is received.
+ const AnnotatedMessage annotated_message =
+ bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const tmb::message_type_id message_type = tagged_message.message_type();
+ LOG(INFO) << "ForemanDistributed received typed '" << message_type
+ << "' message from client " << annotated_message.sender;
+ switch (message_type) {
+ case kShiftbossRegistrationMessage: {
+ S::ShiftbossRegistrationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
+ break;
+ }
+ case kAdmitRequestMessage: {
+ const AdmitRequestMessage *request_message =
+ static_cast<const AdmitRequestMessage*>(tagged_message.message());
+
+ const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
+ DCHECK(!query_handles.empty());
+
+ bool all_queries_admitted = true;
+ if (query_handles.size() == 1u) {
+ all_queries_admitted =
+ policy_enforcer_->admitQuery(query_handles.front());
+ } else {
+ all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+ }
+ if (!all_queries_admitted) {
+ LOG(WARNING) << "The scheduler could not admit all the queries";
+ // TODO(harshad) - Inform the main thread about the failure.
+ }
+ break;
+ }
+ case kQueryInitiateResponseMessage: {
+ // TODO(zuyu): check the query id.
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: // Fall through
+ case kDataPipelineMessage:
+ case kRebuildWorkOrderCompleteMessage:
+ case kWorkOrderCompleteMessage:
+ case kWorkOrderFeedbackMessage: {
+ policy_enforcer_->processMessage(tagged_message);
+ break;
+ }
+ case kInitiateRebuildResponseMessage: {
+ // A unique case in the distributed version.
+ policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
+ break;
+ }
+ case kSaveQueryResultResponseMessage: {
+ S::SaveQueryResultResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+ break;
+ }
+ case kPoisonMessage: {
+ if (policy_enforcer_->hasQueries()) {
+ LOG(WARNING) << "Foreman thread exiting while some queries are "
+ "under execution or waiting to be admitted";
+ }
+
+ // Shutdown all Shiftbosses.
+ tmb::Address shiftboss_addresses;
+ for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
+ shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
+ }
+
+ tmb::MessageStyle broadcast_style;
+ broadcast_style.Broadcast(true);
+
+ TaggedMessage poison_message(kPoisonMessage);
+
+ const MessageBus::SendStatus send_status =
+ bus_->Send(foreman_client_id_,
+ shiftboss_addresses,
+ broadcast_style,
+ move(poison_message));
+ DCHECK(send_status == MessageBus::SendStatus::kOK);
+ return;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type to Foreman";
+ }
+
+ if (canCollectNewMessages(message_type)) {
+ vector<unique_ptr<S::WorkOrderMessage>> new_messages;
+ policy_enforcer_->getWorkOrderProtoMessages(&new_messages);
+ dispatchWorkOrderMessages(new_messages);
+ }
+ }
+}
+
+bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
+ return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+ kCatalogRelationNewBlockMessage,
+ kWorkOrderFeedbackMessage) &&
+ // TODO(zuyu): Multiple Shiftbosses support.
+ !shiftboss_directory_.hasReachedCapacity(0);
+}
+
+void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
+ for (const auto &message : messages) {
+ DCHECK(message != nullptr);
+ // TODO(zuyu): Multiple Shiftbosses support.
+ sendWorkOrderMessage(0, *message);
+ shiftboss_directory_.incrementNumQueuedWorkOrders(0);
+ }
+}
+
+void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
+ const S::WorkOrderMessage &proto) {
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kWorkOrderMessage);
+ free(proto_bytes);
+
+ const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index);
+ LOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage
+ << "') to Shiftboss with TMB client ID " << shiftboss_client_id;
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_client_id,
+ move(message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const {
+ const std::vector<WorkOrderTimeEntry> &recorded_times =
+ policy_enforcer_->getProfilingResults(query_id);
+ fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out);
+ for (const auto &workorder_entry : recorded_times) {
+ const std::size_t worker_id = workorder_entry.worker_id;
+ fprintf(out,
+ "%lu,%lu,%lu,%lu\n",
+ query_id,
+ worker_id,
+ workorder_entry.operator_id, // Operator ID.
+ workorder_entry.end_time - workorder_entry.start_time); // Time.
+ }
+}
+
+void ForemanDistributed::processShiftbossRegisterationMessage(const client_id shiftboss_client_id,
+ const std::size_t work_order_capacity) {
+ S::ShiftbossRegistrationResponseMessage proto;
+ proto.set_shiftboss_index(shiftboss_directory_.size());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kShiftbossRegistrationResponseMessage);
+ free(proto_bytes);
+
+ shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity);
+
+ LOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '"
+ << kShiftbossRegistrationResponseMessage
+ << "') to Shiftboss with TMB client id " << shiftboss_client_id;
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_client_id,
+ move(message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id,
+ const relation_id result_relation_id) {
+ S::QueryExecutionSuccessMessage proto;
+ proto.mutable_result_relation()->MergeFrom(
+ static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryExecutionSuccessMessage);
+ free(proto_bytes);
+
+ // Notify the CLI regarding the query result.
+ LOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
+ << "') to CLI with TMB client id " << cli_id;
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ cli_id,
+ move(message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
new file mode 100644
index 0000000..4404e29
--- /dev/null
+++ b/query_execution/ForemanDistributed.hpp
@@ -0,0 +1,130 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ForemanBase.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief The Foreman receives queries from the main thread, messages from the
+ * policy enforcer and dispatches the work to Shiftbosses. It also
+ * receives work completion messages from Shiftbosses.
+ **/
+class ForemanDistributed final : public ForemanBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param bus A pointer to the TMB.
+ * @param catalog_database The catalog database where this query is executed.
+ * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+ * @param profile_individual_workorders Whether every workorder's execution
+ * be profiled or not.
+ *
+ * @note If cpu_id is not specified, Foreman thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ ForemanDistributed(tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ const int cpu_id = -1,
+ const bool profile_individual_workorders = false);
+
+ ~ForemanDistributed() override {}
+
+ /**
+ * @brief Print the results of profiling individual work orders for a given
+ * query.
+ *
+ * TODO(harshad) - Add the name of the operator to the output.
+ *
+ * @param query_id The ID of the query for which the results are to be printed.
+ * @param out The file stream.
+ **/
+ void printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const;
+
+ protected:
+ void run() override;
+
+ private:
+ /**
+ * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
+ * worker threads.
+ *
+ * @param messages The messages to be dispatched.
+ **/
+ void dispatchWorkOrderMessages(
+ const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
+
+ /**
+ * @brief Send the given message to the specified worker.
+ *
+ * @param worker_index The logical index of the recipient worker in
+ * ShiftbossDirectory.
+ * @param proto The WorkOrderMessage to be sent.
+ **/
+ void sendWorkOrderMessage(const std::size_t worker_index,
+ const serialization::WorkOrderMessage &proto);
+
+ void processShiftbossRegisterationMessage(const tmb::client_id shiftboss_client_id,
+ const std::size_t work_order_capacity);
+
+ void processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
+ const relation_id result_relation_id);
+
+ /**
+ * @brief Check if we can collect new messages from the PolicyEnforcer.
+ *
+ * @param message_type The type of the last received message.
+ **/
+ bool canCollectNewMessages(const tmb::message_type_id message_type);
+
+ ShiftbossDirectory shiftboss_directory_;
+
+ CatalogDatabaseLite *catalog_database_;
+
+ std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
+
+ DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index c76a9e1..3fba899 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -58,7 +58,7 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" can be allocated in a single round of dispatch of messages to"
" the workers.");
-void PolicyEnforcerDistributed::getWorkOrderMessages(
+void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
// Iterate over admitted queries until either there are no more
// messages available, or the maximum number of messages have
@@ -67,7 +67,7 @@ void PolicyEnforcerDistributed::getWorkOrderMessages(
// TODO(harshad) - Make this function generic enough so that it
// works well when multiple queries are getting executed.
if (admitted_queries_.empty()) {
- LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 16ebe36..f2ab23a 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -78,7 +78,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
*
* @param work_order_messages The work order messages to be dispatched.
**/
- void getWorkOrderMessages(
+ void getWorkOrderProtoMessages(
std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 33a93b0..0ad77c7 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -63,7 +63,7 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
// We sort the following message types in the order of a life cycle of a query.
enum QueryExecutionMessageType : message_type_id {
- kAdmitRequestMessage, // Requesting a query (or queries) to be admitted, from
+ kAdmitRequestMessage = 0, // Requesting a query (or queries) to be admitted, from
// the main thread to Foreman.
kWorkOrderMessage, // From Foreman to Worker.
kWorkOrderCompleteMessage, // From Worker to Foreman.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ddfd47f..5c2c5e0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -147,10 +147,11 @@ void Shiftboss::run() {
proto.relation_id());
break;
}
- case kWorkOrderCompleteMessage: // Fall through.
- case kRebuildWorkOrderCompleteMessage:
+ case kCatalogRelationNewBlockMessage: // Fall through.
case kDataPipelineMessage:
- case kWorkOrderFeedbackMessage: {
+ case kWorkOrderFeedbackMessage:
+ case kWorkOrderCompleteMessage:
+ case kRebuildWorkOrderCompleteMessage: {
DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') forwarded typed '" << annotated_message.tagged_message.message_type()
<< "' message from Worker with TMB client ID '" << annotated_message.sender
@@ -165,6 +166,15 @@ void Shiftboss::run() {
CHECK(send_status == MessageBus::SendStatus::kOK);
break;
}
+ case kQueryTeardownMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::QueryTeardownMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ query_contexts_.erase(proto.query_id());
+ break;
+ }
case kSaveQueryResultMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
@@ -175,8 +185,12 @@ void Shiftboss::run() {
storage_manager_->saveBlockOrBlob(proto.blocks(i));
}
+ // Clean up query execution states, i.e., QueryContext.
+ query_contexts_.erase(proto.query_id());
+
serialization::SaveQueryResultResponseMessage proto_response;
proto_response.set_relation_id(proto.relation_id());
+ proto_response.set_cli_id(proto.cli_id());
const size_t proto_response_length = proto_response.ByteSize();
char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 30a8d1a..94b10a2 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -21,6 +21,7 @@
#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
#include <cstddef>
+#include <cstdint>
#include <memory>
#include <unordered_map>
@@ -97,27 +98,34 @@ class Shiftboss : public Thread {
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+
// Message sent to Worker.
bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
- // Message sent to Foreman.
- bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
// Forward the following message types from Foreman to Workers.
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
// Forward the following message types from Workers to Foreman.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+ // Clean up query execution states, i.e., QueryContext.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
// Stop itself.
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 1453291..357313c 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -80,6 +80,14 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ add_executable(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
+ DistributedExecutionGeneratorTest.cpp
+ DistributedExecutionGeneratorTestRunner.cpp
+ DistributedExecutionGeneratorTestRunner.hpp
+ "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
+ "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+endif(ENABLE_DISTRIBUTED)
add_executable(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
ExecutionGeneratorTest.cpp
ExecutionGeneratorTestRunner.cpp
@@ -109,6 +117,38 @@ add_executable(quickstep_queryoptimizer_tests_OptimizerTextTest
"${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
"${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
+ glog
+ gtest
+ gtest_main
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogTypedefs
+ quickstep_cli_DropRelation
+ quickstep_cli_PrintToScreen
+ quickstep_parser_ParseStatement
+ quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_ForemanDistributed
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_Shiftboss
+ quickstep_queryexecution_Worker
+ quickstep_queryexecution_WorkerDirectory
+ quickstep_queryoptimizer_ExecutionGenerator
+ quickstep_queryoptimizer_LogicalGenerator
+ quickstep_queryoptimizer_OptimizerContext
+ quickstep_queryoptimizer_PhysicalGenerator
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_physical_Physical
+ quickstep_queryoptimizer_tests_TestDatabaseLoader
+ quickstep_utility_Macros
+ quickstep_utility_MemStream
+ quickstep_utility_SqlError
+ quickstep_utility_TextBasedTestDriver
+ tmb
+ ${LIBS})
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
${GFLAGS_LIB_NAME}
glog
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
new file mode 100644
index 0000000..ae14e03
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <fstream>
+#include <memory>
+
+#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+#include "utility/textbased_test/TextBasedTest.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+using quickstep::TextBasedTest;
+
+QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
+
+int main(int argc, char** argv) {
+ google::InitGoogleLogging(argv[0]);
+ // Honor FLAGS_buffer_pool_slots in StorageManager.
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ if (argc < 4) {
+ LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1
+ << " are provided";
+ }
+
+ std::ifstream input_file(argv[1]);
+ CHECK(input_file.is_open()) << argv[1];
+ std::unique_ptr<quickstep::optimizer::DistributedExecutionGeneratorTestRunner>
+ test_runner(
+ new quickstep::optimizer::DistributedExecutionGeneratorTestRunner(argv[3]));
+ test_driver.reset(
+ new quickstep::TextBasedTestDriver(&input_file, test_runner.get()));
+ test_driver->registerOption(
+ quickstep::optimizer::DistributedExecutionGeneratorTestRunner::kResetOption);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ const int success = RUN_ALL_TESTS();
+ if (success != 0) {
+ test_driver->writeActualOutputToFile(argv[2]);
+ }
+
+ return success;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
new file mode 100644
index 0000000..45851f1
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
+
+#include <cstdio>
+#include <set>
+#include <string>
+
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_optimizer/ExecutionGenerator.hpp"
+#include "query_optimizer/LogicalGenerator.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/PhysicalGenerator.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "utility/MemStream.hpp"
+#include "utility/SqlError.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogRelation;
+
+namespace optimizer {
+
+const char *DistributedExecutionGeneratorTestRunner::kResetOption =
+ "reset_before_execution";
+
+void DistributedExecutionGeneratorTestRunner::runTestCase(
+ const std::string &input, const std::set<std::string> &options,
+ std::string *output) {
+ // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
+
+ VLOG(4) << "Test SQL(s): " << input;
+
+ if (options.find(kResetOption) != options.end()) {
+ test_database_loader_.clear();
+ test_database_loader_.createTestRelation(false /* allow_vchar */);
+ test_database_loader_.loadTestRelation();
+ }
+
+ MemStream output_stream;
+ sql_parser_.feedNextBuffer(new std::string(input));
+
+ while (true) {
+ ParseResult result = sql_parser_.getNextStatement();
+
+ OptimizerContext optimizer_context(test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager());
+
+ if (result.condition != ParseResult::kSuccess) {
+ if (result.condition == ParseResult::kError) {
+ *output = result.error_message;
+ }
+ break;
+ }
+
+ std::printf("%s\n", result.parsed_statement->toString().c_str());
+ try {
+ QueryHandle query_handle(query_id_++, cli_id_);
+ LogicalGenerator logical_generator(&optimizer_context);
+ PhysicalGenerator physical_generator;
+ ExecutionGenerator execution_generator(&optimizer_context,
+ &query_handle);
+
+ const physical::PhysicalPtr physical_plan =
+ physical_generator.generatePlan(
+ logical_generator.generatePlan(*result.parsed_statement));
+ execution_generator.generatePlan(physical_plan);
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ cli_id_,
+ foreman_->getBusClientID(),
+ &query_handle,
+ &bus_);
+
+ const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+ DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+ const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
+ if (query_result_relation) {
+ PrintToScreen::PrintRelation(*query_result_relation,
+ test_database_loader_.storage_manager(),
+ output_stream.file());
+ DropRelation::Drop(*query_result_relation,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager());
+ }
+ } catch (const SqlError &error) {
+ *output = error.formatMessage(input);
+ break;
+ }
+ }
+
+ if (output->empty()) {
+ *output = output_stream.str();
+ }
+}
+
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
new file mode 100644
index 0000000..ae81013
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -0,0 +1,143 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "utility/Macros.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+#include "utility/textbased_test/TextBasedTestRunner.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+namespace optimizer {
+
+/**
+ * @brief TextBasedTestRunner for testing the ExecutionGenerator in the
+ * distributed version.
+ */
+class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
+ public:
+ /**
+ * @brief If this option is enabled, recreate the entire database and
+ * repopulate the data before every test.
+ */
+ static const char *kResetOption;
+
+ /**
+ * @brief Constructor.
+ */
+ explicit DistributedExecutionGeneratorTestRunner(const std::string &storage_path)
+ : query_id_(0),
+ test_database_loader_(storage_path) {
+ test_database_loader_.createTestRelation(false /* allow_vchar */);
+ test_database_loader_.loadTestRelation();
+
+ bus_.Initialize();
+
+ // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
+ // could receive a registration message from the latter.
+ foreman_.reset(new ForemanDistributed(&bus_, test_database_loader_.catalog_database()));
+
+ worker_.reset(new Worker(0 /* worker_thread_index */, &bus_));
+
+ std::vector<tmb::client_id> worker_client_ids;
+ worker_client_ids.push_back(worker_->getBusClientID());
+
+ // We don't use the NUMA aware version of worker code.
+ const std::vector<numa_node_id> numa_nodes(worker_client_ids.size(), kAnyNUMANodeID);
+
+ workers_.reset(
+ new WorkerDirectory(worker_client_ids.size(), worker_client_ids, numa_nodes));
+
+ shiftboss_.reset(new Shiftboss(&bus_, test_database_loader_.storage_manager(), workers_.get()));
+
+ cli_id_ = bus_.Connect();
+ bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage);
+ bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+
+ foreman_->start();
+
+ shiftboss_->start();
+ worker_->start();
+ }
+
+ ~DistributedExecutionGeneratorTestRunner() {
+ tmb::TaggedMessage poison_tagged_message(quickstep::kPoisonMessage);
+
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(
+ &bus_,
+ cli_id_,
+ foreman_->getBusClientID(),
+ std::move(poison_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+ worker_->join();
+ shiftboss_->join();
+
+ foreman_->join();
+ }
+
+ void runTestCase(const std::string &input,
+ const std::set<std::string> &options,
+ std::string *output) override;
+
+ private:
+ std::size_t query_id_;
+
+ SqlParserWrapper sql_parser_;
+ TestDatabaseLoader test_database_loader_;
+
+ MessageBusImpl bus_;
+
+ tmb::client_id cli_id_;
+
+ std::unique_ptr<ForemanDistributed> foreman_;
+
+ std::unique_ptr<Worker> worker_;
+ std::unique_ptr<WorkerDirectory> workers_;
+
+ std::unique_ptr<Shiftboss> shiftboss_;
+
+ DISALLOW_COPY_AND_ASSIGN(DistributedExecutionGeneratorTestRunner);
+};
+
+} // namespace optimizer
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bf25af99/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 1980980..1ea6a17 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -15,6 +15,63 @@
# specific language governing permissions and limitations
# under the License.
+if (ENABLE_DISTRIBUTED)
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_create
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_delete
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_distinct
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_drop
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_index
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_insert
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_join
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Join.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_select
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Select.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_stringpatternmatching
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/StringPatternMatching.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_tablegenerator
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/TableGenerator.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator/")
+ add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_update
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Update.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate/")
+endif(ENABLE_DISTRIBUTED)
add_test(quickstep_queryoptimizer_tests_executiongenerator_create
"../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
"${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -76,6 +133,19 @@ add_test(quickstep_queryoptimizer_tests_executiongenerator_update
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
+if (ENABLE_DISTRIBUTED)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator)
+ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate)
+endif(ENABLE_DISTRIBUTED)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Drop)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Index)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Insert)
@@ -83,4 +153,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
-file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
\ No newline at end of file
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)