You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/04 19:03:54 UTC
[1/4] incubator-quickstep git commit: Added the distributed execution
engine and tests. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/new-distributed-exe-test 01c48c15b -> 738ffe9a7 (forced update)
Added the distributed execution engine and tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/220fa06f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/220fa06f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/220fa06f
Branch: refs/heads/new-distributed-exe-test
Commit: 220fa06fff04ffffe14bf24ee090c5bea0b5f55d
Parents: aaecc76
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 22 11:31:33 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Fri Jul 29 15:59:10 2016 -0700
----------------------------------------------------------------------
query_execution/AdmitRequestMessage.hpp | 7 +
query_execution/CMakeLists.txt | 50 ++-
query_execution/ForemanDistributed.cpp | 333 +++++++++++++++++++
query_execution/ForemanDistributed.hpp | 130 ++++++++
query_execution/PolicyEnforcerBase.cpp | 3 +
query_execution/PolicyEnforcerBase.hpp | 7 +
query_execution/PolicyEnforcerDistributed.cpp | 253 ++++++++++++++
query_execution/PolicyEnforcerDistributed.hpp | 112 +++++++
query_execution/QueryExecutionMessages.proto | 16 +-
query_execution/QueryExecutionTypedefs.hpp | 4 +
query_execution/QueryManagerBase.cpp | 3 +-
query_execution/QueryManagerBase.hpp | 26 +-
query_execution/QueryManagerDistributed.cpp | 41 ++-
query_execution/QueryManagerDistributed.hpp | 10 +-
query_execution/Shiftboss.cpp | 89 +++--
query_optimizer/CMakeLists.txt | 4 +
query_optimizer/QueryHandle.hpp | 26 ++
query_optimizer/tests/CMakeLists.txt | 41 +++
.../tests/DistributedExecutionGeneratorTest.cpp | 57 ++++
.../DistributedExecutionGeneratorTestRunner.cpp | 122 +++++++
.../DistributedExecutionGeneratorTestRunner.hpp | 146 ++++++++
.../tests/execution_generator/CMakeLists.txt | 68 +++-
third_party/tmb/include/tmb/tagged_message.h | 9 +
23 files changed, 1511 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp
index e33b354..75c5ff6 100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@ -60,6 +60,13 @@ class AdmitRequestMessage {
return query_handles_;
}
+ /**
+ * @brief Get the mutable query handles from this message.
+ **/
+ std::vector<QueryHandle*>* getQueryHandlesMutable() {
+ return &query_handles_;
+ }
+
private:
std::vector<QueryHandle*> query_handles_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 8bf1ab1..cfb72d7 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -33,8 +33,14 @@ if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
endif()
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
@@ -83,6 +89,26 @@ target_link_libraries(quickstep_queryexecution_ForemanBase
quickstep_threading_Thread
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_ForemanDistributed
+ glog
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_AdmitRequestMessage
+ quickstep_queryexecution_ForemanBase
+ quickstep_queryexecution_PolicyEnforcerDistributed
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_ShiftbossDirectory
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_threading_ThreadUtil
+ quickstep_utility_EqualsAnyConstant
+ quickstep_utility_Macros
+ tmb
+ ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ForemanSingleNode
glog
quickstep_queryexecution_AdmitRequestMessage
@@ -110,6 +136,26 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
+ glog
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_Catalog_proto
+ quickstep_queryexecution_PolicyEnforcerBase
+ quickstep_queryexecution_QueryContext_proto
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionState
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_QueryManagerBase
+ quickstep_queryexecution_QueryManagerDistributed
+ quickstep_queryexecution_ShiftbossDirectory
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_storage_StorageBlockInfo
+ quickstep_utility_Macros
+ tmb
+ ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
glog
quickstep_catalog_CatalogTypedefs
@@ -294,10 +340,12 @@ target_link_libraries(quickstep_queryexecution
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_ForemanDistributed
+ quickstep_queryexecution_PolicyEnforcerDistributed
quickstep_queryexecution_QueryManagerDistributed
quickstep_queryexecution_Shiftboss
quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
# Tests:
if (ENABLE_DISTRIBUTED)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
new file mode 100644
index 0000000..1c0fba8
--- /dev/null
+++ b/query_execution/ForemanDistributed.cpp
@@ -0,0 +1,333 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/ForemanDistributed.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+ForemanDistributed::ForemanDistributed(
+ tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ const int cpu_id,
+ const bool profile_individual_workorders)
+ : ForemanBase(bus, cpu_id),
+ catalog_database_(DCHECK_NOTNULL(catalog_database)) {
+ const std::vector<QueryExecutionMessageType> sender_message_types{
+ kShiftbossRegistrationResponseMessage,
+ kQueryInitiateMessage,
+ kWorkOrderMessage,
+ kInitiateRebuildMessage,
+ kSaveQueryResultMessage,
+ kQueryExecutionSuccessMessage,
+ kPoisonMessage};
+
+ for (const auto message_type : sender_message_types) {
+ bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+ }
+
+ const std::vector<QueryExecutionMessageType> receiver_message_types{
+ kShiftbossRegistrationMessage,
+ kAdmitRequestMessage,
+ kQueryInitiateResponseMessage,
+ kCatalogRelationNewBlockMessage,
+ kDataPipelineMessage,
+ kInitiateRebuildResponseMessage,
+ kWorkOrderCompleteMessage,
+ kRebuildWorkOrderCompleteMessage,
+ kWorkOrderFeedbackMessage,
+ kWorkOrdersAvailableMessage,
+ kSaveQueryResultResponseMessage,
+ kPoisonMessage};
+
+ for (const auto message_type : receiver_message_types) {
+ bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+ }
+
+ policy_enforcer_.reset(new PolicyEnforcerDistributed(
+ foreman_client_id_,
+ catalog_database_,
+ &shiftboss_directory_,
+ bus_,
+ profile_individual_workorders));
+}
+
+void ForemanDistributed::run() {
+ if (cpu_id_ >= 0) {
+ // We can pin the foreman thread to a CPU if specified.
+ ThreadUtil::BindToCPU(cpu_id_);
+ }
+
+ // Ensure that at least one Shiftboss to register.
+ if (shiftboss_directory_.empty()) {
+ const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
+ LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
+
+ S::ShiftbossRegistrationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
+ DCHECK_EQ(1u, shiftboss_directory_.size());
+ }
+
+ // Event loop
+ for (;;) {
+ // Receive() causes this thread to sleep until next message is received.
+ const AnnotatedMessage annotated_message =
+ bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const tmb::message_type_id message_type = tagged_message.message_type();
+ LOG(INFO) << "ForemanDistributed received typed '" << message_type
+ << "' message from client " << annotated_message.sender;
+ switch (message_type) {
+ case kShiftbossRegistrationMessage: {
+ S::ShiftbossRegistrationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
+ break;
+ }
+ case kAdmitRequestMessage: {
+ AdmitRequestMessage *request_message =
+ const_cast<AdmitRequestMessage*>(
+ static_cast<const AdmitRequestMessage*>(tagged_message.message()));
+
+ vector<QueryHandle *> *query_handles = request_message->getQueryHandlesMutable();
+ DCHECK(!query_handles->empty());
+
+ for (QueryHandle *handle : *query_handles) {
+ handle->setClientId(annotated_message.sender);
+ }
+
+ bool all_queries_admitted = true;
+ if (query_handles->size() == 1u) {
+ all_queries_admitted =
+ policy_enforcer_->admitQuery(query_handles->front());
+ } else {
+ all_queries_admitted = policy_enforcer_->admitQueries(*query_handles);
+ }
+ if (!all_queries_admitted) {
+ LOG(WARNING) << "The scheduler could not admit all the queries";
+ // TODO(harshad) - Inform the main thread about the failure.
+ }
+ break;
+ }
+ case kQueryInitiateResponseMessage: {
+ // TODO(zuyu): check the query id.
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: // Fall through
+ case kDataPipelineMessage:
+ case kRebuildWorkOrderCompleteMessage:
+ case kWorkOrderCompleteMessage:
+ case kWorkOrderFeedbackMessage:
+ case kWorkOrdersAvailableMessage: {
+ policy_enforcer_->processMessage(tagged_message);
+ break;
+ }
+ case kInitiateRebuildResponseMessage: {
+ // A unique case in the distributed version.
+ policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
+ break;
+ }
+ case kSaveQueryResultResponseMessage: {
+ S::SaveQueryResultResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+ break;
+ }
+ case kPoisonMessage: {
+ if (policy_enforcer_->hasQueries()) {
+ LOG(WARNING) << "Foreman thread exiting while some queries are "
+ "under execution or waiting to be admitted";
+ }
+
+ // Shutdown all Shiftbosses.
+ tmb::Address shiftboss_addresses;
+ for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
+ shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
+ }
+
+ tmb::MessageStyle broadcast_style;
+ broadcast_style.Broadcast(true);
+
+ TaggedMessage poison_message(kPoisonMessage);
+
+ const tmb::MessageBus::SendStatus send_status =
+ bus_->Send(foreman_client_id_,
+ shiftboss_addresses,
+ broadcast_style,
+ move(poison_message));
+ DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+ return;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type to Foreman";
+ }
+
+ if (canCollectNewMessages(message_type)) {
+ vector<unique_ptr<S::WorkOrderMessage>> new_messages;
+ policy_enforcer_->getWorkOrderMessages(&new_messages);
+ dispatchWorkOrderMessages(new_messages);
+ }
+ }
+}
+
+bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
+ return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+ kCatalogRelationNewBlockMessage,
+ kWorkOrderFeedbackMessage) &&
+ // TODO(zuyu): Multiple Shiftbosses support.
+ !shiftboss_directory_.hasReachedCapacity(0);
+}
+
+void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
+ for (const auto &message : messages) {
+ DCHECK(message != nullptr);
+ // TODO(zuyu): Multiple Shiftbosses support.
+ sendWorkOrderMessage(0, *message);
+ shiftboss_directory_.incrementNumQueuedWorkOrders(0);
+ }
+}
+
+void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
+ const S::WorkOrderMessage &proto) {
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kWorkOrderMessage);
+ free(proto_bytes);
+
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_directory_.getClientId(shiftboss_index),
+ move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to Shiftboss with TMB client ID " << shiftboss_directory_.getClientId(shiftboss_index);
+}
+
+void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const {
+ const std::vector<
+ std::tuple<std::size_t, std::size_t, std::size_t>>
+ &recorded_times = policy_enforcer_->getProfilingResults(query_id);
+ fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out);
+ for (const auto &workorder_entry : recorded_times) {
+ // Note: Index of the "worker thread index" in the tuple is 0.
+ const std::size_t worker_id = std::get<0>(workorder_entry);
+ fprintf(out,
+ "%lu,%lu,%lu,%lu\n",
+ query_id,
+ worker_id,
+ std::get<1>(workorder_entry), // Operator ID.
+ std::get<2>(workorder_entry)); // Time.
+ }
+}
+
+void ForemanDistributed::processShiftbossRegisterationMessage(const client_id shiftboss_client_id,
+ const std::size_t work_order_capacity) {
+ shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity);
+
+ S::ShiftbossRegistrationResponseMessage proto;
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kShiftbossRegistrationResponseMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '"
+ << kShiftbossRegistrationResponseMessage
+ << "') to Shiftboss with TMB client id " << shiftboss_client_id;
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_client_id,
+ move(message));
+}
+
+void ForemanDistributed::processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
+ const relation_id result_relation_id) {
+ S::QueryExecutionSuccessMessage proto;
+ proto.mutable_result_relation()->MergeFrom(
+ static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryExecutionSuccessMessage);
+ free(proto_bytes);
+
+ // Notify the CLI regarding the query result.
+ LOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
+ << "') to CLI with TMB client id " << cli_id;
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ cli_id,
+ move(message));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
new file mode 100644
index 0000000..8a4a97c
--- /dev/null
+++ b/query_execution/ForemanDistributed.hpp
@@ -0,0 +1,130 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ForemanBase.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief The Foreman receives queries from the main thread, messages from the
+ * policy enforcer and dispatches the work to Shiftbosses. It also
+ * receives work completion messages from Shiftbosses.
+ **/
+class ForemanDistributed final : public ForemanBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param bus A pointer to the TMB.
+ * @param catalog_database The catalog database where this query is executed.
+ * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+ * @param profile_individual_workorders Whether every workorder's execution
+ * be profiled or not.
+ *
+ * @note If cpu_id is not specified, Foreman thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ ForemanDistributed(tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ const int cpu_id = -1,
+ const bool profile_individual_workorders = false);
+
+ ~ForemanDistributed() override {}
+
+ /**
+ * @brief Print the results of profiling individual work orders for a given
+ * query.
+ *
+ * TODO(harshad) - Add the name of the operator to the output.
+ *
+ * @param query_id The ID of the query for which the results are to be printed.
+ * @param out The file stream.
+ **/
+ void printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const;
+
+ protected:
+ void run() override;
+
+ private:
+ /**
+ * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
+ * worker threads.
+ *
+ * @param messages The messages to be dispatched.
+ **/
+ void dispatchWorkOrderMessages(
+ const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
+
+ /**
+ * @brief Send the given message to the specified worker.
+ *
+ * @param worker_index The logical index of the recipient worker in
+ * ShiftbossDirectory.
+ * @param proto The WorkOrderMessage to be sent.
+ **/
+ void sendWorkOrderMessage(const std::size_t worker_index,
+ const serialization::WorkOrderMessage &proto);
+
+ void processShiftbossRegisterationMessage(const tmb::client_id shiftboss_client_id,
+ const std::size_t work_order_capacity);
+
+ void processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
+ const relation_id result_relation_id);
+
+ /**
+ * @brief Check if we can collect new messages from the PolicyEnforcer.
+ *
+ * @param message_type The type of the last received message.
+ **/
+ bool canCollectNewMessages(const tmb::message_type_id message_type);
+
+ ShiftbossDirectory shiftboss_directory_;
+
+ CatalogDatabaseLite *catalog_database_;
+
+ std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
+
+ DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index d16a502..a28fa3b 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -131,8 +131,11 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
default:
LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
}
+
if (admitted_queries_[query_id]->queryStatus(op_index) ==
QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+ onQueryCompletion(admitted_queries_[query_id]->query_handle());
+
removeQuery(query_id);
if (!waiting_queries_.empty()) {
// Admit the earliest waiting query.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 0482ebc..1de0677 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -148,6 +148,13 @@ class PolicyEnforcerBase {
void recordTimeForWorkOrder(
const serialization::NormalWorkOrderCompletionMessage &proto);
+ /**
+ * @brief Add custom actions upon the completion of a query.
+ *
+ * @param query_handle The query handle.
+ **/
+ virtual void onQueryCompletion(QueryHandle *query_handle) {}
+
CatalogDatabaseLite *catalog_database_;
const bool profile_individual_workorders_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
new file mode 100644
index 0000000..59df3de
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -0,0 +1,253 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerDistributed.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+ " can be allocated in a single round of dispatch of messages to"
+ " the workers.");
+
+void PolicyEnforcerDistributed::getWorkOrderMessages(
+ vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+ // Iterate over admitted queries until either there are no more
+ // messages available, or the maximum number of messages have
+ // been collected.
+ DCHECK(work_order_messages->empty());
+ // TODO(harshad) - Make this function generic enough so that it
+ // works well when multiple queries are getting executed.
+ if (admitted_queries_.empty()) {
+ LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ return;
+ }
+
+ const std::size_t per_query_share =
+ FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+ DCHECK_GT(per_query_share, 0u);
+
+ vector<std::size_t> finished_queries_ids;
+
+ for (const auto &admitted_query_info : admitted_queries_) {
+ QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+ DCHECK(curr_query_manager != nullptr);
+ std::size_t messages_collected_curr_query = 0;
+ while (messages_collected_curr_query < per_query_share) {
+ S::WorkOrderMessage *next_work_order_message =
+ static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
+ if (next_work_order_message != nullptr) {
+ ++messages_collected_curr_query;
+ work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+ } else {
+ // No more work ordes from the current query at this time.
+ // Check if the query's execution is over.
+ if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ // If the query has been executed, remove it.
+ finished_queries_ids.push_back(admitted_query_info.first);
+ }
+ break;
+ }
+ }
+ }
+ for (const std::size_t finished_qid : finished_queries_ids) {
+ onQueryCompletion(admitted_queries_[finished_qid]->query_handle());
+ removeQuery(finished_qid);
+ }
+}
+
+bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
+ if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+ // NOTE(zuyu): Should call before constructing a 'QueryManager'.
+ // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
+ // initializes.
+ initiateQueryInShiftboss(query_handle);
+
+ // Query with the same ID not present, ok to admit.
+ admitted_queries_[query_id].reset(
+ new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+ } else {
+ // This query will have to wait.
+ waiting_queries_.push(query_handle);
+ return false;
+ }
+}
+
+void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) {
+ S::InitiateRebuildResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ const std::size_t query_id = proto.query_id();
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+ QueryManagerBase *query_manager = admitted_queries_[query_id].get();
+
+ const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
+ query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders);
+ shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders);
+
+ if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ onQueryCompletion(query_manager->query_handle());
+
+ removeQuery(query_id);
+ if (!waiting_queries_.empty()) {
+ // Admit the earliest waiting query.
+ QueryHandle *new_query = waiting_queries_.front();
+ waiting_queries_.pop();
+ admitQuery(new_query);
+ }
+ }
+}
+
+void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
+ S::QueryInitiateMessage proto;
+ proto.set_query_id(query_handle->query_id());
+ proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto());
+ proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryInitiateMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
+ << "') to Shiftboss 0";
+
+ // TODO(zuyu): Multiple Shiftbosses support.
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_directory_->getClientId(0),
+ move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+
+ // Wait Shiftboss for QueryInitiateResponseMessage.
+ const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
+ LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
+
+ S::QueryInitiateResponseMessage proto_response;
+ CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+}
+
+void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) {
+ const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+ if (query_result == nullptr) {
+ // TODO(zuyu): notify Shiftboss to remove QueryContext.
+ TaggedMessage message(kQueryExecutionSuccessMessage);
+
+ const tmb::client_id cli_id = query_handle->getClientId();
+
+ // Notify the CLI regarding the query execution result.
+ LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
+ << "') to CLI with TMB client id " << cli_id;
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ cli_id,
+ move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to CLI with TMB client ID " << cli_id;
+ return;
+ }
+
+ // SaveQueryResultMessage implies QueryContext clean up in Shiftboss.
+ S::SaveQueryResultMessage proto;
+ proto.set_query_id(query_handle->query_id());
+ proto.set_relation_id(query_result->getID());
+
+ const vector<block_id> blocks(query_result->getBlocksSnapshot());
+ for (const block_id block : blocks) {
+ proto.add_blocks(block);
+ }
+
+ proto.set_cli_id(query_handle->getClientId());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kSaveQueryResultMessage);
+ free(proto_bytes);
+
+ LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+ << "') to Shiftboss 0";
+ // TODO(zuyu): Support multiple shiftbosses.
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_directory_->getClientId(0),
+ move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
new file mode 100644
index 0000000..8b07748
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -0,0 +1,112 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb {
+class MessageBus;
+class TaggedMessage;
+}
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ * in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param foreman_client_id The TMB client ID of the Foreman.
+ * @param catalog_database The CatalogDatabase used.
+ * @param bus The TMB.
+ **/
+ PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
+ CatalogDatabaseLite *catalog_database,
+ ShiftbossDirectory *shiftboss_directory,
+ tmb::MessageBus *bus,
+ const bool profile_individual_workorders = false)
+ : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+ foreman_client_id_(foreman_client_id),
+ shiftboss_directory_(shiftboss_directory),
+ bus_(bus) {}
+
+ /**
+ * @brief Destructor.
+ **/
+ ~PolicyEnforcerDistributed() override {}
+
+ bool admitQuery(QueryHandle *query_handle) override;
+
+ /**
+ * @brief Get work order messages to be dispatched. These messages come from
+ * the active queries.
+ *
+ * @param work_order_messages The work order messages to be dispatched.
+ **/
+ void getWorkOrderMessages(
+ std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+
+ /**
+ * @brief Process the initiate rebuild work order response message.
+ *
+ * @param tagged_message The message.
+ **/
+ void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
+
+ private:
+ void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override {
+ shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index);
+ }
+
+ void onQueryCompletion(QueryHandle *query_handle) override;
+
+ void initiateQueryInShiftboss(QueryHandle *query_handle);
+
+ const tmb::client_id foreman_client_id_;
+
+ ShiftbossDirectory *shiftboss_directory_;
+
+ tmb::MessageBus *bus_;
+
+ DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 308d736..99de75c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -111,15 +111,27 @@ message InitiateRebuildResponseMessage {
required uint64 query_id = 1;
required uint64 operator_index = 2;
required uint64 num_rebuild_work_orders = 3;
+ required uint64 shiftboss_index = 4;
}
message SaveQueryResultMessage {
- required int32 relation_id = 1;
- repeated fixed64 blocks = 2 [packed=true];
+ required uint64 query_id = 1;
+ required int32 relation_id = 2;
+ repeated fixed64 blocks = 3 [packed=true];
+
+ // Defined in "tmb/id_typedefs.h".
+ required uint32 cli_id = 4;
}
message SaveQueryResultResponseMessage {
required int32 relation_id = 1;
+
+ // Defined in "tmb/id_typedefs.h".
+ required uint32 cli_id = 2;
+}
+
+message QueryExecutionSuccessMessage {
+ optional CatalogRelationSchema result_relation = 1;
}
// BlockLocator related messages.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index b67209f..0d43237 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -84,6 +84,10 @@ enum QueryExecutionMessageType : message_type_id {
kSaveQueryResultMessage, // From Foreman to Shiftboss.
kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
+ // From Foreman to CLI.
+ kQueryExecutionSuccessMessage,
+ kQueryExecutionErrorMessage,
+
// BlockLocator related messages, sorted in a life cycle of StorageManager
// with a unique block domain.
kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index d2a3341..4ee51c3 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -35,7 +35,8 @@ using std::pair;
namespace quickstep {
QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
- : query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
+ : query_handle_(query_handle),
+ query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
query_dag_(DCHECK_NOTNULL(
DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
num_operators_in_dag_(query_dag_->size()),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 6edfd5c..3338478 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -24,6 +24,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionState.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
@@ -79,6 +80,13 @@ class QueryManagerBase {
}
/**
+ * @brief Get the query handle.
+ **/
+ inline QueryHandle* query_handle() const {
+ return query_handle_;
+ }
+
+ /**
* @brief Process the received WorkOrder complete message.
*
* @param op_index The index of the specified operator node in the query DAG
@@ -128,6 +136,20 @@ class QueryManagerBase {
void processFeedbackMessage(const dag_node_index op_index,
const WorkOrder::FeedbackMessage &message);
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Process the initiate rebuild work order response message.
+ *
+ * @param shiftboss_index The Shiftboss index for the rebuild work orders.
+ * @param op_index The index of the specified operator node in the query DAG
+ * for initiating the rebuild work order.
+ * @param num_rebuild_work_orders The number of the rebuild work orders
+ * generated for the operator indexed by 'op_index'.
+ **/
+ virtual void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+ const std::size_t num_rebuild_work_orders) {}
+#endif // QUICKSTEP_DISTRIBUTED
+
/**
* @brief Get the query status after processing an incoming message.
*
@@ -250,9 +272,11 @@ class QueryManagerBase {
return query_exec_state_->hasRebuildInitiated(index);
}
+ QueryHandle *query_handle_; // Owned by the optimizer.
+
const std::size_t query_id_;
- DAG<RelationalOperator, bool> *query_dag_;
+ DAG<RelationalOperator, bool> *query_dag_; // Owned by 'query_handle_'.
const dag_node_index num_operators_in_dag_;
// For all nodes, store their receiving dependents.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index e906fa5..bed3e45 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -32,6 +32,7 @@
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
using std::free;
using std::malloc;
@@ -42,11 +43,11 @@ using std::unique_ptr;
namespace quickstep {
QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
- ShiftbossDirectory *shiftbosses,
+ ShiftbossDirectory *shiftboss_directory,
const tmb::client_id foreman_client_id,
tmb::MessageBus *bus)
: QueryManagerBase(query_handle),
- shiftbosses_(shiftbosses),
+ shiftboss_directory_(shiftboss_directory),
foreman_client_id_(foreman_client_id),
bus_(bus),
normal_workorder_protos_container_(
@@ -119,6 +120,27 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index)
return generated_new_workorder_protos;
}
+void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
+ const std::size_t num_rebuild_work_orders) {
+ // TODO(zuyu): Multiple workers support.
+ query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
+
+ if (num_rebuild_work_orders != 0u) {
+ // Wait for the rebuild work orders finish.
+ return;
+ }
+
+ markOperatorFinished(op_index);
+
+ for (const std::pair<dag_node_index, bool> &dependent_link :
+ query_dag_->getDependents(op_index)) {
+ const dag_node_index dependent_op_index = dependent_link.first;
+ if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+ processOperator(dependent_op_index, true);
+ }
+ }
+}
+
bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
DCHECK(checkRebuildRequired(index));
DCHECK(!checkRebuildInitiated(index));
@@ -127,6 +149,7 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId);
serialization::InitiateRebuildMessage proto;
+ proto.set_query_id(query_id_);
proto.set_operator_index(index);
proto.set_insert_destination_index(op.getInsertDestinationID());
proto.set_relation_id(op.getOutputRelationID());
@@ -140,13 +163,17 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
kInitiateRebuildMessage);
free(proto_bytes);
- LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
+ LOG(INFO) << "QueryManagerDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
<< "') to Shiftboss";
// TODO(zuyu): Multiple workers support.
- QueryExecutionUtil::SendTMBMessage(bus_,
- foreman_client_id_,
- shiftbosses_->getClientId(0),
- move(tagged_msg));
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_directory_->getClientId(0),
+ move(tagged_msg));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
// The negative value indicates that the number of rebuild work orders is to be
// determined.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 8641c22..9a3f44b 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -15,6 +15,7 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
+#include <cstddef>
#include <memory>
#include "query_execution/QueryExecutionState.hpp"
@@ -47,12 +48,12 @@ class QueryManagerDistributed final : public QueryManagerBase {
* @brief Constructor.
*
* @param query_handle The QueryHandle object for this query.
- * @param shiftbosses The ShiftbossDirectory to use.
+ * @param shiftboss_directory The ShiftbossDirectory to use.
* @param foreman_client_id The TMB client ID of the foreman thread.
* @param bus The TMB used for communication.
**/
QueryManagerDistributed(QueryHandle *query_handle,
- ShiftbossDirectory *shiftbosses,
+ ShiftbossDirectory *shiftboss_directory,
const tmb::client_id foreman_client_id,
tmb::MessageBus *bus);
@@ -60,6 +61,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
bool fetchNormalWorkOrders(const dag_node_index index) override;
+ void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+ const std::size_t num_rebuild_work_orders) override;
+
/**
* @brief Get the next normal workorder to be excuted, wrapped in a
* WorkOrderMessage proto.
@@ -88,7 +92,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
(query_exec_state_->getNumRebuildWorkOrders(index) == 0);
}
- ShiftbossDirectory *shiftbosses_;
+ ShiftbossDirectory *shiftboss_directory_;
const tmb::client_id foreman_client_id_;
tmb::MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 7f655c6..925dc1f 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -113,10 +113,14 @@ void Shiftboss::run() {
<< "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
<< "') from Foreman to worker " << worker_index;
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- workers_->getClientID(worker_index),
- move(worker_tagged_message));
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ workers_->getClientID(worker_index),
+ move(worker_tagged_message));
+ DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
break;
}
case kInitiateRebuildMessage: {
@@ -143,10 +147,14 @@ void Shiftboss::run() {
<< "' message from worker (client " << annotated_message.sender << ") to Foreman";
DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(annotated_message.tagged_message));
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(annotated_message.tagged_message));
+ DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
break;
}
case kSaveQueryResultMessage: {
@@ -167,8 +175,11 @@ void Shiftboss::run() {
}
}
+ query_contexts_.erase(proto.query_id());
+
serialization::SaveQueryResultResponseMessage proto_response;
proto_response.set_relation_id(proto.relation_id());
+ proto_response.set_cli_id(proto.cli_id());
const size_t proto_response_length = proto_response.ByteSize();
char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
@@ -182,10 +193,14 @@ void Shiftboss::run() {
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
<< "') to Foreman";
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(message_response));
+ DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
break;
}
case kPoisonMessage: {
@@ -196,7 +211,7 @@ void Shiftboss::run() {
tmb::MessageStyle broadcast_style;
broadcast_style.Broadcast(true);
- tmb::MessageBus::SendStatus send_status =
+ const tmb::MessageBus::SendStatus send_status =
bus_->Send(shiftboss_client_id_,
worker_addresses_,
broadcast_style,
@@ -249,7 +264,7 @@ void Shiftboss::registerWithForeman() {
kShiftbossRegistrationMessage);
free(proto_bytes);
- tmb::MessageBus::SendStatus send_status =
+ const tmb::MessageBus::SendStatus send_status =
bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
@@ -268,10 +283,6 @@ void Shiftboss::processQueryInitiateMessage(
bus_));
query_contexts_.emplace(query_id, move(query_context));
- LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
- << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
- << "') to Foreman";
-
serialization::QueryInitiateResponseMessage proto;
proto.set_query_id(query_id);
@@ -284,10 +295,18 @@ void Shiftboss::processQueryInitiateMessage(
kQueryInitiateResponseMessage);
free(proto_bytes);
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
+ LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+ << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
+ << "') to Foreman";
+
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(message_response));
+ DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
}
void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -311,6 +330,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
proto.set_query_id(query_id);
proto.set_operator_index(op_index);
proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
+ // TODO(zuyu): Multiple Shiftboss support.
+ proto.set_shiftboss_index(0);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -321,10 +342,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
kInitiateRebuildResponseMessage);
free(proto_bytes);
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- foreman_client_id_,
- move(message_response));
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ foreman_client_id_,
+ move(message_response));
+ DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Foreman with TMB client ID " << foreman_client_id_;
for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
// NOTE(zuyu): Worker releases the memory after the execution of
@@ -349,10 +374,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
<< "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
<< "') to worker " << worker_index;
- QueryExecutionUtil::SendTMBMessage(bus_,
- shiftboss_client_id_,
- workers_->getClientID(worker_index),
- move(worker_tagged_message));
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ shiftboss_client_id_,
+ workers_->getClientID(worker_index),
+ move(worker_tagged_message));
+ DCHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_
+ << " to Worker with TMB client ID " << workers_->getClientID(worker_index);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index a56b714..b6b97a0 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -212,6 +212,10 @@ target_link_libraries(quickstep_queryoptimizer_QueryHandle
quickstep_queryexecution_QueryContext_proto
quickstep_queryoptimizer_QueryPlan
quickstep_utility_Macros)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryoptimizer_QueryHandle
+ tmb)
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryoptimizer_QueryPlan
quickstep_relationaloperators_RelationalOperator
quickstep_utility_DAG
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 5f3649a..bbf1918 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -24,9 +24,14 @@
#include "catalog/Catalog.pb.h"
#include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
#include "query_optimizer/QueryPlan.hpp"
#include "utility/Macros.hpp"
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "tmb/id_typedefs.h"
+#endif // QUICKSTEP_DISTRIBUTED
+
namespace quickstep {
class CatalogRelation;
@@ -119,6 +124,22 @@ class QueryHandle {
query_result_relation_ = relation;
}
+#ifdef QUICKSTEP_DISTRIBUTED
+ /**
+ * @brief Get the client id.
+ */
+ tmb::client_id getClientId() const {
+ return cli_id_;
+ }
+
+ /**
+ * @brief Set the client id.
+ */
+ void setClientId(const tmb::client_id cli_id) {
+ cli_id_ = cli_id;
+ }
+#endif // QUICKSTEP_DISTRIBUTED
+
private:
const std::size_t query_id_;
const std::uint64_t query_priority_;
@@ -134,6 +155,11 @@ class QueryHandle {
// and deleted by the Cli shell.
const CatalogRelation *query_result_relation_;
+#ifdef QUICKSTEP_DISTRIBUTED
+ // The client id of the CLI which sends the query.
+ tmb::client_id cli_id_;
+#endif // QUICKSTEP_DISTRIBUTED
+
DISALLOW_COPY_AND_ASSIGN(QueryHandle);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 9cad47f..6522117 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -78,6 +78,14 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ add_executable(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
+ DistributedExecutionGeneratorTest.cpp
+ DistributedExecutionGeneratorTestRunner.cpp
+ DistributedExecutionGeneratorTestRunner.hpp
+ "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
+ "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+endif(ENABLE_DISTRIBUTED)
add_executable(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
ExecutionGeneratorTest.cpp
ExecutionGeneratorTestRunner.cpp
@@ -107,6 +115,39 @@ add_executable(quickstep_queryoptimizer_tests_OptimizerTextTest
"${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
"${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest
+ glog
+ gtest
+ gtest_main
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogTypedefs
+ quickstep_cli_DropRelation
+ quickstep_cli_PrintToScreen
+ quickstep_parser_ParseStatement
+ quickstep_parser_SqlParserWrapper
+ quickstep_queryexecution_ForemanDistributed
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_Shiftboss
+ quickstep_queryexecution_Worker
+ quickstep_queryexecution_WorkerDirectory
+ quickstep_queryexecution_WorkerMessage
+ quickstep_queryoptimizer_ExecutionGenerator
+ quickstep_queryoptimizer_LogicalGenerator
+ quickstep_queryoptimizer_OptimizerContext
+ quickstep_queryoptimizer_PhysicalGenerator
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_physical_Physical
+ quickstep_queryoptimizer_tests_TestDatabaseLoader
+ quickstep_utility_Macros
+ quickstep_utility_MemStream
+ quickstep_utility_SqlError
+ quickstep_utility_TextBasedTestDriver
+ tmb
+ ${LIBS})
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
${GFLAGS_LIB_NAME}
glog
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
new file mode 100644
index 0000000..fc0c67d
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <fstream>
+#include <memory>
+
+#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+#include "utility/textbased_test/TextBasedTest.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+using quickstep::TextBasedTest;
+
+QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
+
+int main(int argc, char** argv) {
+ google::InitGoogleLogging(argv[0]);
+ // Honor FLAGS_buffer_pool_slots in StorageManager.
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ if (argc < 4) {
+ LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1
+ << " are provided";
+ }
+
+ std::ifstream input_file(argv[1]);
+ CHECK(input_file.is_open()) << argv[1];
+ std::unique_ptr<quickstep::optimizer::DistributedExecutionGeneratorTestRunner>
+ test_runner(
+ new quickstep::optimizer::DistributedExecutionGeneratorTestRunner(argv[3]));
+ test_driver.reset(
+ new quickstep::TextBasedTestDriver(&input_file, test_runner.get()));
+ test_driver->registerOption(
+ quickstep::optimizer::DistributedExecutionGeneratorTestRunner::kResetOption);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ const int success = RUN_ALL_TESTS();
+ if (success != 0) {
+ test_driver->writeActualOutputToFile(argv[2]);
+ }
+
+ return success;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
new file mode 100644
index 0000000..ffed4f0
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp"
+
+#include <cstdio>
+#include <set>
+#include <string>
+
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_optimizer/ExecutionGenerator.hpp"
+#include "query_optimizer/LogicalGenerator.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/PhysicalGenerator.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "utility/MemStream.hpp"
+#include "utility/SqlError.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogRelation;
+
+namespace optimizer {
+
+const char *DistributedExecutionGeneratorTestRunner::kResetOption =
+ "reset_before_execution";
+
+void DistributedExecutionGeneratorTestRunner::runTestCase(
+ const std::string &input, const std::set<std::string> &options,
+ std::string *output) {
+ // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
+
+ VLOG(4) << "Test SQL(s): " << input;
+
+ if (options.find(kResetOption) != options.end()) {
+ test_database_loader_.clear();
+ test_database_loader_.createTestRelation(false /* allow_vchar */);
+ test_database_loader_.loadTestRelation();
+ }
+
+ MemStream output_stream;
+ sql_parser_.feedNextBuffer(new std::string(input));
+
+ while (true) {
+ ParseResult result = sql_parser_.getNextStatement();
+
+ OptimizerContext optimizer_context(query_id_++,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager());
+
+ if (result.condition != ParseResult::kSuccess) {
+ if (result.condition == ParseResult::kError) {
+ *output = result.error_message;
+ }
+ break;
+ }
+
+ std::printf("%s\n", result.parsed_statement->toString().c_str());
+ try {
+ QueryHandle query_handle(optimizer_context.query_id());
+ LogicalGenerator logical_generator(&optimizer_context);
+ PhysicalGenerator physical_generator;
+ ExecutionGenerator execution_generator(&optimizer_context,
+ &query_handle);
+
+ const physical::PhysicalPtr physical_plan =
+ physical_generator.generatePlan(
+ logical_generator.generatePlan(*result.parsed_statement));
+ execution_generator.generatePlan(physical_plan);
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ cli_id_,
+ foreman_->getBusClientID(),
+ &query_handle,
+ &bus_);
+
+ const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+ DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+ const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
+ if (query_result_relation) {
+ PrintToScreen::PrintRelation(*query_result_relation,
+ test_database_loader_.storage_manager(),
+ output_stream.file());
+ DropRelation::Drop(*query_result_relation,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager());
+ }
+ } catch (const SqlError &error) {
+ *output = error.formatMessage(input);
+ break;
+ }
+ }
+
+ if (output->empty()) {
+ *output = output_stream.str();
+ }
+}
+
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
new file mode 100644
index 0000000..cd59596
--- /dev/null
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -0,0 +1,146 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
+
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "utility/Macros.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+#include "utility/textbased_test/TextBasedTestRunner.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+namespace optimizer {
+
+/**
+ * @brief TextBasedTestRunner for testing the ExecutionGenerator in the
+ * distributed version.
+ */
+class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
+ public:
+ /**
+ * @brief If this option is enabled, recreate the entire database and
+ * repopulate the data before every test.
+ */
+ static const char *kResetOption;
+
+ /**
+ * @brief Constructor.
+ */
+ explicit DistributedExecutionGeneratorTestRunner(const std::string &storage_path)
+ : query_id_(0),
+ test_database_loader_(storage_path) {
+ test_database_loader_.createTestRelation(false /* allow_vchar */);
+ test_database_loader_.loadTestRelation();
+
+ bus_.Initialize();
+
+ // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
+ // could receive a registration message from the latter.
+ foreman_.reset(new ForemanDistributed(&bus_, test_database_loader_.catalog_database()));
+
+ worker_.reset(new Worker(0 /* worker_thread_index */, &bus_));
+
+ std::vector<tmb::client_id> worker_client_ids;
+ worker_client_ids.push_back(worker_->getBusClientID());
+
+ // We don't use the NUMA aware version of worker code.
+ const std::vector<numa_node_id> numa_nodes(worker_client_ids.size(), kAnyNUMANodeID);
+
+ workers_.reset(
+ new WorkerDirectory(worker_client_ids.size(), worker_client_ids, numa_nodes));
+
+ shiftboss_.reset(new Shiftboss(&bus_, test_database_loader_.storage_manager(), workers_.get()));
+
+ cli_id_ = bus_.Connect();
+ bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage);
+ bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
+ bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+
+ foreman_->start();
+
+ shiftboss_->start();
+ worker_->start();
+ }
+
+ ~DistributedExecutionGeneratorTestRunner() {
+ std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
+ tmb::TaggedMessage poison_tagged_message(poison_message.get(),
+ sizeof(*poison_message),
+ quickstep::kPoisonMessage);
+
+ const tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(
+ &bus_,
+ cli_id_,
+ foreman_->getBusClientID(),
+ std::move(poison_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+ worker_->join();
+ shiftboss_->join();
+
+ foreman_->join();
+ }
+
+ void runTestCase(const std::string &input,
+ const std::set<std::string> &options,
+ std::string *output) override;
+
+ private:
+ std::size_t query_id_;
+
+ SqlParserWrapper sql_parser_;
+ TestDatabaseLoader test_database_loader_;
+
+ MessageBusImpl bus_;
+
+ tmb::client_id cli_id_;
+
+ std::unique_ptr<ForemanDistributed> foreman_;
+
+ std::unique_ptr<Worker> worker_;
+ std::unique_ptr<WorkerDirectory> workers_;
+
+ std::unique_ptr<Shiftboss> shiftboss_;
+
+ DISALLOW_COPY_AND_ASSIGN(DistributedExecutionGeneratorTestRunner);
+};
+
+} // namespace optimizer
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 56bae16..cd0e626 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -13,6 +13,61 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_create
+ "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Create.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_delete
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Delete.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_distinct
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Distinct.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_drop
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Drop.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_index
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Index.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_insert
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Insert.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_join
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Join.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Join.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_select
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Select.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Select.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_stringpatternmatching
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/StringPatternMatching.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_tablegenerator
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/TableGenerator.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/TableGenerator.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator/")
+add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_update
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Update.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Update.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate/")
add_test(quickstep_queryoptimizer_tests_executiongenerator_create
"../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
"${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -74,6 +129,17 @@ add_test(quickstep_queryoptimizer_tests_executiongenerator_update
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Drop)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Index)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Insert)
@@ -81,4 +147,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
-file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
\ No newline at end of file
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/third_party/tmb/include/tmb/tagged_message.h
----------------------------------------------------------------------
diff --git a/third_party/tmb/include/tmb/tagged_message.h b/third_party/tmb/include/tmb/tagged_message.h
index 49dcee7..75b980e 100644
--- a/third_party/tmb/include/tmb/tagged_message.h
+++ b/third_party/tmb/include/tmb/tagged_message.h
@@ -63,6 +63,15 @@ class TaggedMessage {
}
/**
+ * @brief Constructor which creates an empty, typed message.
+ **/
+ explicit TaggedMessage(const message_type_id message_type)
+ : payload_inline_(true),
+ message_type_(message_type) {
+ payload_.in_line.size = 0;
+ }
+
+ /**
* @brief Constructor.
*
* @param msg A pointer to the message contents in memory, which will be
[4/4] incubator-quickstep git commit: Disabled shared build.
Posted by zu...@apache.org.
Disabled shared build.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/738ffe9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/738ffe9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/738ffe9a
Branch: refs/heads/new-distributed-exe-test
Commit: 738ffe9a7adf714b3fe81c840bb5c7888a7372e1
Parents: 0f662a5
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Jun 14 20:04:51 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Thu Aug 4 12:03:07 2016 -0700
----------------------------------------------------------------------
.travis.yml | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/738ffe9a/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 3c00cd7..2650b6c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -56,7 +56,6 @@ before_script:
- $PROTOC --version
- (cd build &&
cmake -D CMAKE_BUILD_TYPE=$BUILD_TYPE
- -D BUILD_SHARED_LIBS=On
-D CMAKE_C_FLAGS_DEBUG="$DEBUG_FLAGS"
-D CMAKE_CXX_FLAGS_DEBUG="$DEBUG_FLAGS"
-D CMAKE_C_FLAGS_RELEASE="$RELEASE_FLAGS"
[2/4] incubator-quickstep git commit: Clean up query execution states
in Shiftboss.
Posted by zu...@apache.org.
Clean up query execution states in Shiftboss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9e158490
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9e158490
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9e158490
Branch: refs/heads/new-distributed-exe-test
Commit: 9e1584901ee3cd3658c426d8c470ed5aabe046d5
Parents: 220fa06
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Sat Jul 30 10:59:31 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Sat Jul 30 10:59:31 2016 -0700
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 1 +
query_execution/PolicyEnforcerDistributed.cpp | 42 +++++++++++++++++-----
query_execution/QueryExecutionMessages.proto | 4 +++
query_execution/QueryExecutionTypedefs.hpp | 2 ++
query_execution/Shiftboss.cpp | 9 +++++
query_execution/Shiftboss.hpp | 3 ++
6 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 1c0fba8..2632113 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -67,6 +67,7 @@ ForemanDistributed::ForemanDistributed(
kQueryInitiateMessage,
kWorkOrderMessage,
kInitiateRebuildMessage,
+ kQueryTeardownMessage,
kSaveQueryResultMessage,
kQueryExecutionSuccessMessage,
kPoisonMessage};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 59df3de..087e0b3 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -196,29 +196,53 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) {
const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+ const tmb::client_id cli_id = query_handle->getClientId();
+ const std::size_t query_id = query_handle->query_id();
+
if (query_result == nullptr) {
- // TODO(zuyu): notify Shiftboss to remove QueryContext.
- TaggedMessage message(kQueryExecutionSuccessMessage);
+ // Clean up query execution states, i.e., QueryContext, in Shiftboss.
+ serialization::QueryTeardownMessage proto;
+ proto.set_query_id(query_id);
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryTeardownMessage);
+
+ // TODO(zuyu): Support multiple shiftbosses.
+ LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+ << "') to Shiftboss 0";
+ tmb::MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_directory_->getClientId(0),
+ move(message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+ << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+ << " to Shiftboss";
- const tmb::client_id cli_id = query_handle->getClientId();
+ TaggedMessage cli_message(kQueryExecutionSuccessMessage);
- // Notify the CLI regarding the query execution result.
+ // Notify the CLI query execution successfully.
LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
<< "') to CLI with TMB client id " << cli_id;
- const tmb::MessageBus::SendStatus send_status =
+ send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
cli_id,
- move(message));
+ move(cli_message));
CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
<< "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
<< " to CLI with TMB client ID " << cli_id;
return;
}
- // SaveQueryResultMessage implies QueryContext clean up in Shiftboss.
+ // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
S::SaveQueryResultMessage proto;
- proto.set_query_id(query_handle->query_id());
+ proto.set_query_id(query_id);
proto.set_relation_id(query_result->getID());
const vector<block_id> blocks(query_result->getBlocksSnapshot());
@@ -226,7 +250,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) {
proto.add_blocks(block);
}
- proto.set_cli_id(query_handle->getClientId());
+ proto.set_cli_id(cli_id);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 99de75c..b182b87 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -114,6 +114,10 @@ message InitiateRebuildResponseMessage {
required uint64 shiftboss_index = 4;
}
+message QueryTeardownMessage {
+ required uint64 query_id = 1;
+}
+
message SaveQueryResultMessage {
required uint64 query_id = 1;
required int32 relation_id = 2;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 0d43237..5706c82 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,6 +81,8 @@ enum QueryExecutionMessageType : message_type_id {
kInitiateRebuildMessage, // From Foreman to Shiftboss.
kInitiateRebuildResponseMessage, // From Shiftboss to Foreman.
+ kQueryTeardownMessage, // From Foreman to Shiftboss.
+
kSaveQueryResultMessage, // From Foreman to Shiftboss.
kSaveQueryResultResponseMessage, // From Shiftboss to Foreman.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 925dc1f..978fe34 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -157,6 +157,15 @@ void Shiftboss::run() {
<< " to Foreman with TMB client ID " << foreman_client_id_;
break;
}
+ case kQueryTeardownMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::QueryTeardownMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ query_contexts_.erase(proto.query_id());
+ break;
+ }
case kSaveQueryResultMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e158490/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 9e24d62..3943938 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -115,6 +115,9 @@ class Shiftboss : public Thread {
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+ // Clean up query execution states, i.e., QueryContext.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
+
// Stop itself.
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
// Stop all workers.
[3/4] incubator-quickstep git commit: CI w/ gRPC to build the
distributed version.
Posted by zu...@apache.org.
CI w/ gRPC to build the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0f662a5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0f662a5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0f662a5a
Branch: refs/heads/new-distributed-exe-test
Commit: 0f662a5a2414b28537124319573862b6aab43977
Parents: 9e15849
Author: Zuyu Zhang <zz...@pivotal.io>
Authored: Mon May 30 11:57:22 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Thu Aug 4 12:03:07 2016 -0700
----------------------------------------------------------------------
.travis.yml | 26 ++++++++++++++++++--------
1 file changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f662a5a/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 54a0c8a..3c00cd7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,4 +1,4 @@
-# NOTE(quickstep-team): In Travis-CI, jobs timeout if they take more than 120
+# NOTE(quickstep-team): In Travis-CI, jobs timeout if they take more than 50
# mins or if there is no log output for more than 10 mins. Hence, we use -O0 to
# speed up compilation in release build. Also, jobs can only use upto 20GB of
# disk space. Hence, we minimize the amount of debug symbol using -g0 (DEBUG
@@ -15,13 +15,13 @@ compiler:
- clang
env:
- - BUILD_TYPE=Debug VECTOR_COPY_ELISION_LEVEL=joinwithbinaryexpressions
- - BUILD_TYPE=Release VECTOR_COPY_ELISION_LEVEL=joinwithbinaryexpressions
- BUILD_TYPE=Debug VECTOR_COPY_ELISION_LEVEL=selection
- BUILD_TYPE=Release VECTOR_COPY_ELISION_LEVEL=selection
+ - BUILD_TYPE=Debug VECTOR_COPY_ELISION_LEVEL=none
+ - BUILD_TYPE=Release VECTOR_COPY_ELISION_LEVEL=none
install:
- - if [ "$CC" = "gcc" ]; then
+ - if [ "$VECTOR_COPY_ELISION_LEVEL" = "selection" ] && [ "$CC" = "gcc" ]; then
export MAKE_JOBS=1;
else
export MAKE_JOBS=2;
@@ -38,11 +38,22 @@ install:
- export DEBUG_FLAGS="-g0";
- export RELEASE_FLAGS="-O0 -DNDEBUG";
- export LINKER_FLAGS="-s"
+ # Protobuf 3 beta.
+ - wget https://github.com/google/protobuf/releases/download/v3.0.0-beta-3/protobuf-cpp-3.0.0-beta-3.tar.gz
+ - tar -xzvf protobuf-cpp-3.0.0-beta-3.tar.gz
+ - pushd protobuf-3.0.0-beta-3 && ./configure --prefix=/usr && make && sudo make install && popd
+ - rm -rf protobuf*
+ - export PROTOC=`which protoc`
+ # gRPC.
+ - git clone https://github.com/grpc/grpc.git
+ - pushd grpc && git checkout release-0_14_1 && git submodule update --init && make && sudo make install && popd
+ - rm -rf grpc
before_script:
- $CC --version
- $CXX --version
- $CLINKER --version
+ - $PROTOC --version
- (cd build &&
cmake -D CMAKE_BUILD_TYPE=$BUILD_TYPE
-D BUILD_SHARED_LIBS=On
@@ -55,14 +66,15 @@ before_script:
-D CMAKE_CXX_COMPILER=$CXX
-D CMAKE_LINKER=$CLINKER
-D USE_TCMALLOC=0
- -D VECTOR_COPY_ELISION_LEVEL=$VECTOR_COPY_ELISION_LEVEL ..)
+ -D VECTOR_COPY_ELISION_LEVEL=$VECTOR_COPY_ELISION_LEVEL
+ -D ENABLE_DISTRIBUTED=ON ..)
script:
- ./third_party/cpplint/lint_everything.py
- ./validate_cmakelists.py
- ./cyclic_dependency.py
- (cd build && make -j$MAKE_JOBS)
- - (cd build && ctest --output-on-failure -j$TEST_JOBS)
+ - (cd build && ctest -E native_net_client_message_bus_unittest --output-on-failure -j$TEST_JOBS)
after_failure:
- df -h
@@ -84,8 +96,6 @@ addons:
- g++-5
- clang-3.7
- binutils-gold
- - libprotobuf-dev
- - protobuf-compiler
- libgtest-dev
- python-networkx
- libnuma-dev