You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/19 02:18:14 UTC
[1/6] incubator-quickstep git commit: Added ForemanDistributed.
Repository: incubator-quickstep
Updated Branches:
refs/heads/LIP-for-tpch-merged [created] f4af59652
Added ForemanDistributed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1111ec58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1111ec58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1111ec58
Branch: refs/heads/LIP-for-tpch-merged
Commit: 1111ec585be350236ac7631cd1883b1e74c28af6
Parents: 47d1248
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Sat Aug 13 23:37:59 2016 -0700
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:10:16 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 24 ++
query_execution/ForemanDistributed.cpp | 335 ++++++++++++++++++++++++++++
query_execution/ForemanDistributed.hpp | 130 +++++++++++
3 files changed, 489 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1111ec58/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index c7b9d61..f0c988e 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -33,6 +33,9 @@ if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
+if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
if (ENABLE_DISTRIBUTED)
@@ -86,6 +89,26 @@ target_link_libraries(quickstep_queryexecution_ForemanBase
quickstep_threading_Thread
quickstep_utility_Macros
tmb)
+if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_ForemanDistributed
+ glog
+ quickstep_catalog_CatalogDatabase
+ quickstep_catalog_CatalogRelation
+ quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_Catalog_proto
+ quickstep_queryexecution_AdmitRequestMessage
+ quickstep_queryexecution_ForemanBase
+ quickstep_queryexecution_PolicyEnforcerDistributed
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_queryexecution_ShiftbossDirectory
+ quickstep_threading_ThreadUtil
+ quickstep_utility_EqualsAnyConstant
+ quickstep_utility_Macros
+ tmb
+ ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ForemanSingleNode
glog
quickstep_queryexecution_AdmitRequestMessage
@@ -317,6 +340,7 @@ target_link_libraries(quickstep_queryexecution
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_ForemanDistributed
quickstep_queryexecution_PolicyEnforcerDistributed
quickstep_queryexecution_QueryManagerDistributed
quickstep_queryexecution_Shiftboss
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1111ec58/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
new file mode 100644
index 0000000..29f5b9b
--- /dev/null
+++ b/query_execution/ForemanDistributed.cpp
@@ -0,0 +1,335 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/ForemanDistributed.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+class QueryHandle;
+
+ForemanDistributed::ForemanDistributed(
+ MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ const int cpu_id,
+ const bool profile_individual_workorders)
+ : ForemanBase(bus, cpu_id),
+ catalog_database_(DCHECK_NOTNULL(catalog_database)) {
+ const std::vector<QueryExecutionMessageType> sender_message_types{
+ kShiftbossRegistrationResponseMessage,
+ kQueryInitiateMessage,
+ kWorkOrderMessage,
+ kInitiateRebuildMessage,
+ kQueryTeardownMessage,
+ kSaveQueryResultMessage,
+ kQueryExecutionSuccessMessage,
+ kPoisonMessage};
+
+ for (const auto message_type : sender_message_types) {
+ bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+ }
+
+ const std::vector<QueryExecutionMessageType> receiver_message_types{
+ kShiftbossRegistrationMessage,
+ kAdmitRequestMessage,
+ kQueryInitiateResponseMessage,
+ kCatalogRelationNewBlockMessage,
+ kDataPipelineMessage,
+ kInitiateRebuildResponseMessage,
+ kWorkOrderCompleteMessage,
+ kRebuildWorkOrderCompleteMessage,
+ kWorkOrderFeedbackMessage,
+ kSaveQueryResultResponseMessage,
+ kPoisonMessage};
+
+ for (const auto message_type : receiver_message_types) {
+ bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+ }
+
+ policy_enforcer_.reset(new PolicyEnforcerDistributed(
+ foreman_client_id_,
+ catalog_database_,
+ &shiftboss_directory_,
+ bus_,
+ profile_individual_workorders));
+}
+
+void ForemanDistributed::run() {
+ if (cpu_id_ >= 0) {
+ // We can pin the foreman thread to a CPU if specified.
+ ThreadUtil::BindToCPU(cpu_id_);
+ }
+
+ // Ensure that at least one Shiftboss to register.
+ if (shiftboss_directory_.empty()) {
+ const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
+ DLOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
+ << "' message from client " << annotated_message.sender;
+
+ S::ShiftbossRegistrationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity());
+ DCHECK_EQ(1u, shiftboss_directory_.size());
+ }
+
+ // Event loop
+ for (;;) {
+ // Receive() causes this thread to sleep until next message is received.
+ const AnnotatedMessage annotated_message =
+ bus_->Receive(foreman_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ const tmb::message_type_id message_type = tagged_message.message_type();
+ DLOG(INFO) << "ForemanDistributed received typed '" << message_type
+ << "' message from client " << annotated_message.sender;
+ switch (message_type) {
+ case kShiftbossRegistrationMessage: {
+ S::ShiftbossRegistrationMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity());
+ break;
+ }
+ case kAdmitRequestMessage: {
+ const AdmitRequestMessage *request_message =
+ static_cast<const AdmitRequestMessage*>(tagged_message.message());
+
+ const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
+ DCHECK(!query_handles.empty());
+
+ bool all_queries_admitted = true;
+ if (query_handles.size() == 1u) {
+ all_queries_admitted =
+ policy_enforcer_->admitQuery(query_handles.front());
+ } else {
+ all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+ }
+ if (!all_queries_admitted) {
+ LOG(WARNING) << "The scheduler could not admit all the queries";
+ // TODO(harshad) - Inform the main thread about the failure.
+ }
+ break;
+ }
+ case kQueryInitiateResponseMessage: {
+ // TODO(zuyu): check the query id.
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: // Fall through
+ case kDataPipelineMessage:
+ case kRebuildWorkOrderCompleteMessage:
+ case kWorkOrderCompleteMessage:
+ case kWorkOrderFeedbackMessage: {
+ policy_enforcer_->processMessage(tagged_message);
+ break;
+ }
+ case kInitiateRebuildResponseMessage: {
+ // A unique case in the distributed version.
+ policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
+ break;
+ }
+ case kSaveQueryResultResponseMessage: {
+ S::SaveQueryResultResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+ break;
+ }
+ case kPoisonMessage: {
+ if (policy_enforcer_->hasQueries()) {
+ LOG(WARNING) << "ForemanDistributed thread exiting while some queries are "
+ "under execution or waiting to be admitted";
+ }
+
+ // Shutdown all Shiftbosses.
+ tmb::Address shiftboss_addresses;
+ for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
+ shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
+ }
+
+ tmb::MessageStyle broadcast_style;
+ broadcast_style.Broadcast(true);
+
+ TaggedMessage poison_message(kPoisonMessage);
+
+ const MessageBus::SendStatus send_status =
+ bus_->Send(foreman_client_id_,
+ shiftboss_addresses,
+ broadcast_style,
+ move(poison_message));
+ DCHECK(send_status == MessageBus::SendStatus::kOK);
+ return;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type to ForemanDistributed";
+ }
+
+ if (canCollectNewMessages(message_type)) {
+ vector<unique_ptr<S::WorkOrderMessage>> new_messages;
+ policy_enforcer_->getWorkOrderProtoMessages(&new_messages);
+ dispatchWorkOrderMessages(new_messages);
+ }
+ }
+}
+
+bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
+ return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+ kCatalogRelationNewBlockMessage,
+ kWorkOrderFeedbackMessage) &&
+ // TODO(zuyu): Multiple Shiftbosses support.
+ !shiftboss_directory_.hasReachedCapacity(0);
+}
+
+void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
+ for (const auto &message : messages) {
+ DCHECK(message != nullptr);
+ // TODO(zuyu): Multiple Shiftbosses support.
+ sendWorkOrderMessage(0, *message);
+ shiftboss_directory_.incrementNumQueuedWorkOrders(0);
+ }
+}
+
+void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
+ const S::WorkOrderMessage &proto) {
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kWorkOrderMessage);
+ free(proto_bytes);
+
+ const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index);
+ DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage
+ << "') to Shiftboss with TMB client ID " << shiftboss_client_id;
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_client_id,
+ move(message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const {
+ const std::vector<WorkOrderTimeEntry> &recorded_times =
+ policy_enforcer_->getProfilingResults(query_id);
+ fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out);
+ for (const auto &workorder_entry : recorded_times) {
+ const std::size_t worker_id = workorder_entry.worker_id;
+ fprintf(out,
+ "%lu,%lu,%lu,%lu\n",
+ query_id,
+ worker_id,
+ workorder_entry.operator_id, // Operator ID.
+ workorder_entry.end_time - workorder_entry.start_time); // Time.
+ }
+}
+
+void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shiftboss_client_id,
+ const std::size_t work_order_capacity) {
+ S::ShiftbossRegistrationResponseMessage proto;
+ proto.set_shiftboss_index(shiftboss_directory_.size());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kShiftbossRegistrationResponseMessage);
+ free(proto_bytes);
+
+ shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity);
+
+ DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '"
+ << kShiftbossRegistrationResponseMessage
+ << "') to Shiftboss with TMB client id " << shiftboss_client_id;
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ shiftboss_client_id,
+ move(message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id,
+ const relation_id result_relation_id) {
+ S::QueryExecutionSuccessMessage proto;
+ proto.mutable_result_relation()->MergeFrom(
+ static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto());
+
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kQueryExecutionSuccessMessage);
+ free(proto_bytes);
+
+ // Notify the CLI regarding the query result.
+ DLOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '"
+ << kQueryExecutionSuccessMessage
+ << "') to CLI with TMB client id " << cli_id;
+ const MessageBus::SendStatus send_status =
+ QueryExecutionUtil::SendTMBMessage(bus_,
+ foreman_client_id_,
+ cli_id,
+ move(message));
+ CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1111ec58/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
new file mode 100644
index 0000000..f9a326a
--- /dev/null
+++ b/query_execution/ForemanDistributed.hpp
@@ -0,0 +1,130 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ForemanBase.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief The Foreman receives queries from the main thread, messages from the
+ * policy enforcer and dispatches the work to Shiftbosses. It also
+ * receives work completion messages from Shiftbosses.
+ **/
+class ForemanDistributed final : public ForemanBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param bus A pointer to the TMB.
+ * @param catalog_database The catalog database where this query is executed.
+ * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+ * @param profile_individual_workorders Whether every workorder's execution
+ * be profiled or not.
+ *
+ * @note If cpu_id is not specified, Foreman thread can be possibly moved
+ * around on different CPUs by the OS.
+ **/
+ ForemanDistributed(tmb::MessageBus *bus,
+ CatalogDatabaseLite *catalog_database,
+ const int cpu_id = -1,
+ const bool profile_individual_workorders = false);
+
+ ~ForemanDistributed() override {}
+
+ /**
+ * @brief Print the results of profiling individual work orders for a given
+ * query.
+ *
+ * TODO(harshad) - Add the name of the operator to the output.
+ *
+ * @param query_id The ID of the query for which the results are to be printed.
+ * @param out The file stream.
+ **/
+ void printWorkOrderProfilingResults(const std::size_t query_id,
+ std::FILE *out) const;
+
+ protected:
+ void run() override;
+
+ private:
+ /**
+ * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
+ * worker threads.
+ *
+ * @param messages The messages to be dispatched.
+ **/
+ void dispatchWorkOrderMessages(
+ const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
+
+ /**
+ * @brief Send the given message to the specified worker.
+ *
+ * @param worker_index The logical index of the recipient worker in
+ * ShiftbossDirectory.
+ * @param proto The WorkOrderMessage to be sent.
+ **/
+ void sendWorkOrderMessage(const std::size_t worker_index,
+ const serialization::WorkOrderMessage &proto);
+
+ void processShiftbossRegistrationMessage(const tmb::client_id shiftboss_client_id,
+ const std::size_t work_order_capacity);
+
+ void processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
+ const relation_id result_relation_id);
+
+ /**
+ * @brief Check if we can collect new messages from the PolicyEnforcer.
+ *
+ * @param message_type The type of the last received message.
+ **/
+ bool canCollectNewMessages(const tmb::message_type_id message_type);
+
+ ShiftbossDirectory shiftboss_directory_;
+
+ CatalogDatabaseLite *catalog_database_;
+
+ std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
+
+ DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
[4/6] incubator-quickstep git commit: Initial commit.
Posted by ji...@apache.org.
Initial commit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9ba18c07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9ba18c07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9ba18c07
Branch: refs/heads/LIP-for-tpch-merged
Commit: 9ba18c07b3a2ef9f23054f358a94dabdfb772414
Parents: 1111ec5
Author: Hakan Memisoglu <ha...@gmail.com>
Authored: Tue Aug 16 16:40:27 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:17:51 2016 -0500
----------------------------------------------------------------------
query_optimizer/CMakeLists.txt | 1 +
query_optimizer/ExecutionGenerator.cpp | 9 +++++
query_optimizer/PhysicalGenerator.cpp | 3 ++
query_optimizer/physical/HashJoin.cpp | 5 +++
query_optimizer/physical/HashJoin.hpp | 13 ++++++-
query_optimizer/rules/AttachBloomFilters.cpp | 1 +
query_optimizer/rules/CMakeLists.txt | 11 ++++++
query_optimizer/rules/FuseJoinSelect.cpp | 43 +++++++++++++++++++++++
query_optimizer/rules/FuseJoinSelect.hpp | 33 +++++++++++++++++
relational_operators/HashJoinOperator.cpp | 13 ++++++-
relational_operators/HashJoinOperator.hpp | 12 +++++++
relational_operators/WorkOrder.proto | 1 +
12 files changed, 143 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7440151..849caaa 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -201,6 +201,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
quickstep_queryoptimizer_logical_Logical
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_rules_AttachBloomFilters
+ quickstep_queryoptimizer_rules_FuseJoinSelect
quickstep_queryoptimizer_rules_PruneColumns
quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
quickstep_queryoptimizer_rules_SwapProbeBuild
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index f8559ec..457366e 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -692,6 +692,14 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
query_context_proto_->add_predicates()->CopyFrom(residual_predicate->getProto());
}
+ // Convert the left filter predicate proto.
+ QueryContext::predicate_id left_filter_predicate_index = QueryContext::kInvalidPredicateId;
+ if (physical_plan->residual_predicate()) {
+ left_filter_predicate_index = query_context_proto_->predicates_size();
+ unique_ptr<const Predicate> left_filter_predicate(convertPredicate(physical_plan->left_filter_predicate()));
+ query_context_proto_->add_predicates()->CopyFrom(left_filter_predicate->getProto());
+ }
+
// Convert the project expressions proto.
const QueryContext::scalar_group_id project_expressions_group_index =
query_context_proto_->scalar_groups_size();
@@ -796,6 +804,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
insert_destination_index,
join_hash_table_index,
residual_predicate_index,
+ left_filter_predicate_index,
project_expressions_group_index,
is_selection_on_build.get(),
join_type));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 9ee685d..e093272 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
#include "query_optimizer/logical/Logical.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/rules/AttachBloomFilters.hpp"
+#include "query_optimizer/rules/FuseJoinSelect.hpp"
#include "query_optimizer/rules/PruneColumns.hpp"
#include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
#include "query_optimizer/rules/SwapProbeBuild.hpp"
@@ -102,6 +103,8 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
}
rules.emplace_back(new PruneColumns());
// rules.emplace_back(new SwapProbeBuild());
+ rules.emplace_back(new FuseJoinSelect());
+ rules.emplace_back(new PruneColumns());
rules.emplace_back(new AttachBloomFilters());
for (std::unique_ptr<Rule<P::Physical>> &rule : rules) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/physical/HashJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.cpp b/query_optimizer/physical/HashJoin.cpp
index 883c87a..dc564a7 100644
--- a/query_optimizer/physical/HashJoin.cpp
+++ b/query_optimizer/physical/HashJoin.cpp
@@ -111,6 +111,11 @@ void HashJoin::getFieldStringItems(
non_container_child_field_names->push_back("residual_predicate");
non_container_child_fields->push_back(residual_predicate_);
}
+ if (left_filter_predicate_ != nullptr) {
+ non_container_child_field_names->push_back("left_filter_predicate");
+ non_container_child_fields->push_back(left_filter_predicate_);
+ }
+
container_child_field_names->push_back("left_join_attributes");
container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
container_child_field_names->push_back("right_join_attributes");
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index a830d0b..e24dbeb 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -107,6 +107,10 @@ class HashJoin : public BinaryJoin {
return join_type_;
}
+ const expressions::PredicatePtr& left_filter_predicate() const {
+ return left_filter_predicate_;
+ }
+
PhysicalPtr copyWithNewChildren(
const std::vector<PhysicalPtr> &new_children) const override {
DCHECK_EQ(children().size(), new_children.size());
@@ -117,7 +121,8 @@ class HashJoin : public BinaryJoin {
residual_predicate_,
project_expressions(),
join_type_,
- bloom_filter_config_);
+ bloom_filter_config_,
+ left_filter_predicate_);
}
std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
@@ -144,6 +149,7 @@ class HashJoin : public BinaryJoin {
* @param residual_predicate Optional filtering predicate evaluated after join.
* @param project_expressions The project expressions.
* @param Join type of this hash join.
+ * @param left_filter_predicate Optional filtering predicate for probe side before join.
* @return An immutable physical HashJoin.
*/
static HashJoinPtr Create(
@@ -154,6 +160,7 @@ class HashJoin : public BinaryJoin {
const expressions::PredicatePtr &residual_predicate,
const std::vector<expressions::NamedExpressionPtr> &project_expressions,
const JoinType join_type,
+ const expressions::PredicatePtr &left_filter_predicate = nullptr,
const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) {
return HashJoinPtr(
new HashJoin(left,
@@ -163,6 +170,7 @@ class HashJoin : public BinaryJoin {
residual_predicate,
project_expressions,
join_type,
+ left_filter_predicate,
bloom_filter_config));
}
@@ -184,12 +192,14 @@ class HashJoin : public BinaryJoin {
const expressions::PredicatePtr &residual_predicate,
const std::vector<expressions::NamedExpressionPtr> &project_expressions,
const JoinType join_type,
+ const expressions::PredicatePtr &left_filter_predicate,
const BloomFilterConfig &bloom_filter_config)
: BinaryJoin(left, right, project_expressions),
left_join_attributes_(left_join_attributes),
right_join_attributes_(right_join_attributes),
residual_predicate_(residual_predicate),
join_type_(join_type),
+ left_filter_predicate_(left_filter_predicate),
bloom_filter_config_(bloom_filter_config) {
}
@@ -197,6 +207,7 @@ class HashJoin : public BinaryJoin {
std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
expressions::PredicatePtr residual_predicate_;
JoinType join_type_;
+ expressions::PredicatePtr left_filter_predicate_;
BloomFilterConfig bloom_filter_config_;
DISALLOW_COPY_AND_ASSIGN(HashJoin);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
index 898d831..f86ba60 100644
--- a/query_optimizer/rules/AttachBloomFilters.cpp
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -338,6 +338,7 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
hash_join->residual_predicate(),
hash_join->project_expressions(),
hash_join->join_type(),
+ hash_join->left_filter_predicate(),
attach_it->second);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 54b1e59..9990a4d 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,7 @@ add_subdirectory(tests)
add_library(quickstep_queryoptimizer_rules_AttachBloomFilters AttachBloomFilters.cpp AttachBloomFilters.hpp)
add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_FuseJoinSelect FuseJoinSelect.cpp FuseJoinSelect.hpp)
add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
add_library(quickstep_queryoptimizer_rules_PushDownFilter PushDownFilter.cpp PushDownFilter.hpp)
@@ -65,6 +66,15 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject
quickstep_queryoptimizer_rules_Rule
quickstep_queryoptimizer_rules_RuleHelper
quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_FuseJoinSelect
+ quickstep_queryoptimizer_expressions_Predicate
+ quickstep_queryoptimizer_physical_HashJoin
+ quickstep_queryoptimizer_physical_PatternMatcher
+ quickstep_queryoptimizer_physical_Physical
+ quickstep_queryoptimizer_physical_Selection
+ quickstep_queryoptimizer_physical_TableReference
+ quickstep_queryoptimizer_rules_BottomUpRule
+ quickstep_queryoptimizer_rules_Rule)
target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
glog
quickstep_queryoptimizer_expressions_AttributeReference
@@ -206,6 +216,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
quickstep_queryoptimizer_rules_AttachBloomFilters
quickstep_queryoptimizer_rules_BottomUpRule
quickstep_queryoptimizer_rules_CollapseProject
+ quickstep_queryoptimizer_rules_FuseJoinSelect
quickstep_queryoptimizer_rules_GenerateJoins
quickstep_queryoptimizer_rules_PruneColumns
quickstep_queryoptimizer_rules_PushDownFilter
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/rules/FuseJoinSelect.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseJoinSelect.cpp b/query_optimizer/rules/FuseJoinSelect.cpp
new file mode 100644
index 0000000..6a8885f
--- /dev/null
+++ b/query_optimizer/rules/FuseJoinSelect.cpp
@@ -0,0 +1,43 @@
+#include "query_optimizer/rules/FuseJoinSelect.hpp"
+
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TableReference.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace P = ::quickstep::optimizer::physical;
+namespace E = ::quickstep::optimizer::expressions;
+
+P::PhysicalPtr FuseJoinSelect::applyToNode(const P::PhysicalPtr &input) {
+ P::HashJoinPtr hash_join;
+ P::SelectionPtr selection;
+ P::TableReferencePtr table_reference;
+
+ if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join)
+ && hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin
+ && P::SomeSelection::MatchesWithConditionalCast(hash_join->left(), &selection)
+ && P::SomeTableReference::MatchesWithConditionalCast(selection->input(), &table_reference)) {
+ const E::PredicatePtr filter_predicate = selection->filter_predicate();
+ P::PhysicalPtr output = P::HashJoin::Create(hash_join->left(),
+ table_reference,
+ hash_join->left_join_attributes(),
+ hash_join->right_join_attributes(),
+ hash_join->residual_predicate(),
+ hash_join->project_expressions(),
+ hash_join->join_type(),
+ filter_predicate);
+ LOG_APPLYING_RULE(input, output);
+ return output;
+ }
+
+ LOG_IGNORING_RULE(input);
+ return input;
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/rules/FuseJoinSelect.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseJoinSelect.hpp b/query_optimizer/rules/FuseJoinSelect.hpp
new file mode 100644
index 0000000..24ac08b
--- /dev/null
+++ b/query_optimizer/rules/FuseJoinSelect.hpp
@@ -0,0 +1,33 @@
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_JOIN_SELECT_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_JOIN_SELECT_HPP_
+
+#include <string>
+
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace P = ::quickstep::optimizer::physical;
+
+class FuseJoinSelect : public BottomUpRule<P::Physical> {
+ public:
+ FuseJoinSelect() {
+ }
+
+ std::string getName() const override { return "FuseJoinSelect"; }
+
+ protected:
+ P::PhysicalPtr applyToNode(const P::PhysicalPtr &input) override;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(FuseJoinSelect);
+};
+
+} // namespace optimizer
+} // namespace quickstep
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 9b573ac..7357acd 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -192,6 +192,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
const Predicate *residual_predicate =
query_context->getPredicate(residual_predicate_index_);
+ const Predicate *left_filter_predicate =
+ query_context->getPredicate(left_filter_predicate_index_);
const vector<unique_ptr<const Scalar>> &selection =
query_context->getScalarGroup(selection_index_);
InsertDestination *output_destination =
@@ -210,6 +212,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
any_join_key_attributes_nullable_,
probe_block_id,
residual_predicate,
+ left_filter_predicate,
selection,
hash_table,
output_destination,
@@ -230,6 +233,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
any_join_key_attributes_nullable_,
probe_relation_block_ids_[num_workorders_generated_],
residual_predicate,
+ left_filter_predicate,
selection,
hash_table,
output_destination,
@@ -370,6 +374,7 @@ serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto(
proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_);
proto->SetExtension(serialization::HashJoinWorkOrder::block_id, block);
proto->SetExtension(serialization::HashJoinWorkOrder::residual_predicate_index, residual_predicate_index_);
+ proto->SetExtension(serialization::HashJoinWorkOrder::left_filter_predicate_index, left_filter_predicate_index_);
return proto;
}
@@ -432,7 +437,13 @@ void HashInnerJoinWorkOrder::execute() {
storage_manager_->getBlock(block_id_, probe_relation_));
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
- std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ std::unique_ptr<ValueAccessor> probe_accessor(
+ probe_store.createValueAccessor(
+ left_filter_predicate_ == nullptr
+ ? nullptr
+ : probe_block->getMatchesForPredicate(left_filter_predicate_)));
+
+
MapBasedJoinedTupleCollector collector;
if (join_key_attributes_.size() == 1) {
hash_table_.getAllFromValueAccessor(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 30571a1..05e16a4 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -128,6 +128,7 @@ class HashJoinOperator : public RelationalOperator {
const QueryContext::insert_destination_id output_destination_index,
const QueryContext::join_hash_table_id hash_table_index,
const QueryContext::predicate_id residual_predicate_index,
+ const QueryContext::predicate_id left_filter_predicate_index,
const QueryContext::scalar_group_id selection_index,
const std::vector<bool> *is_selection_on_build = nullptr,
const JoinType join_type = JoinType::kInnerJoin)
@@ -141,6 +142,7 @@ class HashJoinOperator : public RelationalOperator {
output_destination_index_(output_destination_index),
hash_table_index_(hash_table_index),
residual_predicate_index_(residual_predicate_index),
+ left_filter_predicate_index_(left_filter_predicate_index),
selection_index_(selection_index),
is_selection_on_build_(is_selection_on_build == nullptr
? std::vector<bool>()
@@ -256,6 +258,7 @@ class HashJoinOperator : public RelationalOperator {
const QueryContext::insert_destination_id output_destination_index_;
const QueryContext::join_hash_table_id hash_table_index_;
const QueryContext::predicate_id residual_predicate_index_;
+ const QueryContext::predicate_id left_filter_predicate_index_;
const QueryContext::scalar_group_id selection_index_;
const std::vector<bool> is_selection_on_build_;
const JoinType join_type_;
@@ -304,6 +307,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable,
const block_id lookup_block_id,
const Predicate *residual_predicate,
+ const Predicate *left_filter_predicate,
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
@@ -316,6 +320,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
block_id_(lookup_block_id),
residual_predicate_(residual_predicate),
+ left_filter_predicate_(left_filter_predicate),
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
@@ -388,6 +393,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable_;
const block_id block_id_;
const Predicate *residual_predicate_;
+ const Predicate *left_filter_predicate_;
const std::vector<std::unique_ptr<const Scalar>> &selection_;
const JoinHashTable &hash_table_;
@@ -434,6 +440,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable,
const block_id lookup_block_id,
const Predicate *residual_predicate,
+ const Predicate *left_filter_predicate,
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
@@ -446,6 +453,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
block_id_(lookup_block_id),
residual_predicate_(residual_predicate),
+ left_filter_predicate_(left_filter_predicate),
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
@@ -514,6 +522,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable_;
const block_id block_id_;
const Predicate *residual_predicate_;
+ const Predicate *left_filter_predicate_;
const std::vector<std::unique_ptr<const Scalar>> &selection_;
const JoinHashTable &hash_table_;
@@ -560,6 +569,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable,
const block_id lookup_block_id,
const Predicate *residual_predicate,
+ const Predicate *left_filter_predicate,
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
@@ -572,6 +582,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
block_id_(lookup_block_id),
residual_predicate_(residual_predicate),
+ left_filter_predicate_(left_filter_predicate),
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
@@ -645,6 +656,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable_;
const block_id block_id_;
const Predicate *residual_predicate_;
+ const Predicate *left_filter_predicate_;
const std::vector<std::unique_ptr<const Scalar>> &selection_;
const JoinHashTable &hash_table_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 02aa50e..4874450 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -128,6 +128,7 @@ message HashJoinWorkOrder {
// Used by all but HashOuterJoinWorkOrder.
optional int32 residual_predicate_index = 169;
+ optional int32 left_filter_predicate_index = 400;
// Used by HashOuterJoinWorkOrder only.
repeated bool is_selection_on_build = 170;
}
[2/6] incubator-quickstep git commit: Fixed a bug in deserializing
WindowAggrWorkOrder.
Posted by ji...@apache.org.
Fixed a bug in deserializing WindowAggrWorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9838fcd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9838fcd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9838fcd1
Branch: refs/heads/LIP-for-tpch-merged
Commit: 9838fcd16d7e70edba4dfca3ec979de1231e37e7
Parents: e7524cb
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Aug 12 08:59:12 2016 -0700
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:10:16 2016 -0500
----------------------------------------------------------------------
relational_operators/WorkOrderFactory.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9838fcd1/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 7d7af59..6970486 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -434,7 +434,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)),
move(blocks),
query_context->getInsertDestination(
- proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
+ proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index)));
}
default:
LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto";
[5/6] incubator-quickstep git commit: Fixed the bug.
Posted by ji...@apache.org.
Fixed the bug.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2ea7a846
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2ea7a846
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2ea7a846
Branch: refs/heads/LIP-for-tpch-merged
Commit: 2ea7a84630a184a40b54b88917f8c750307989f6
Parents: 9ba18c0
Author: Hakan Memisoglu <ha...@gmail.com>
Authored: Wed Aug 17 14:39:40 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:17:51 2016 -0500
----------------------------------------------------------------------
query_optimizer/rules/FuseJoinSelect.cpp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ea7a846/query_optimizer/rules/FuseJoinSelect.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseJoinSelect.cpp b/query_optimizer/rules/FuseJoinSelect.cpp
index 6a8885f..e40acfc 100644
--- a/query_optimizer/rules/FuseJoinSelect.cpp
+++ b/query_optimizer/rules/FuseJoinSelect.cpp
@@ -23,8 +23,8 @@ P::PhysicalPtr FuseJoinSelect::applyToNode(const P::PhysicalPtr &input) {
&& P::SomeSelection::MatchesWithConditionalCast(hash_join->left(), &selection)
&& P::SomeTableReference::MatchesWithConditionalCast(selection->input(), &table_reference)) {
const E::PredicatePtr filter_predicate = selection->filter_predicate();
- P::PhysicalPtr output = P::HashJoin::Create(hash_join->left(),
- table_reference,
+ P::PhysicalPtr output = P::HashJoin::Create(table_reference,
+ hash_join->right(),
hash_join->left_join_attributes(),
hash_join->right_join_attributes(),
hash_join->residual_predicate(),
[6/6] incubator-quickstep git commit: Merged with fuse-select-join
Posted by ji...@apache.org.
Merged with fuse-select-join
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f4af5965
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f4af5965
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f4af5965
Branch: refs/heads/LIP-for-tpch-merged
Commit: f4af596523b17d1d5523e882f1b73e3d6c35d1b3
Parents: 2ea7a84
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Aug 18 21:17:56 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 21:17:56 2016 -0500
----------------------------------------------------------------------
query_optimizer/physical/HashJoin.hpp | 4 ++--
relational_operators/HashJoinOperator.hpp | 8 ++++++++
2 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4af5965/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index e24dbeb..32b4f21 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -121,8 +121,8 @@ class HashJoin : public BinaryJoin {
residual_predicate_,
project_expressions(),
join_type_,
- bloom_filter_config_,
- left_filter_predicate_);
+ left_filter_predicate_,
+ bloom_filter_config_);
}
std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4af5965/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 05e16a4..4f53daa 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -511,6 +511,10 @@ class HashSemiJoinWorkOrder : public WorkOrder {
void execute() override;
+ const Predicate *left_filter_predicate() const {
+ return left_filter_predicate_;
+ }
+
private:
void executeWithoutResidualPredicate();
@@ -645,6 +649,10 @@ class HashAntiJoinWorkOrder : public WorkOrder {
}
}
+ const Predicate *left_filter_predicate() const {
+ return left_filter_predicate_;
+ }
+
private:
void executeWithoutResidualPredicate();
[3/6] incubator-quickstep git commit: Minor fixes for the distributed
version.
Posted by ji...@apache.org.
Minor fixes for the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/47d1248a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/47d1248a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/47d1248a
Branch: refs/heads/LIP-for-tpch-merged
Commit: 47d1248a5c61ba0458246a137b788f238ae094d0
Parents: 9838fcd
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Sat Aug 13 23:22:41 2016 -0700
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:10:16 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 16 +++++++-------
query_execution/PolicyEnforcerDistributed.cpp | 10 ++++-----
query_execution/PolicyEnforcerDistributed.hpp | 6 +++---
query_execution/QueryExecutionTypedefs.hpp | 4 ++--
query_execution/Shiftboss.cpp | 20 +++++++++++++++---
query_execution/Shiftboss.hpp | 22 +++++++++++++-------
.../tests/execution_generator/CMakeLists.txt | 2 +-
7 files changed, 51 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index ff0fe08..c7b9d61 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -31,7 +31,7 @@ endif()
add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
@@ -52,12 +52,12 @@ add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryEx
add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp)
add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -80,7 +80,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_threading_ThreadUtil
quickstep_utility_Macros
tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ForemanBase
glog
quickstep_threading_Thread
@@ -223,7 +223,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_utility_DAG
quickstep_utility_Macros
tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryContext
@@ -262,7 +262,7 @@ if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
quickstep_utility_Macros
tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
glog
quickstep_relationaloperators_WorkOrder_proto
@@ -321,7 +321,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_QueryManagerDistributed
quickstep_queryexecution_Shiftboss
quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
# Tests:
if (ENABLE_DISTRIBUTED)
@@ -347,7 +347,7 @@ if (ENABLE_DISTRIBUTED)
tmb
${LIBS})
add_test(BlockLocator_unittest BlockLocator_unittest)
-endif()
+endif(ENABLE_DISTRIBUTED)
add_executable(QueryManagerSingleNode_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index c76a9e1..47491ed 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -58,16 +58,16 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" can be allocated in a single round of dispatch of messages to"
" the workers.");
-void PolicyEnforcerDistributed::getWorkOrderMessages(
- vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
+ vector<unique_ptr<S::WorkOrderMessage>> *work_order_proto_messages) {
// Iterate over admitted queries until either there are no more
// messages available, or the maximum number of messages have
// been collected.
- DCHECK(work_order_messages->empty());
+ DCHECK(work_order_proto_messages->empty());
// TODO(harshad) - Make this function generic enough so that it
// works well when multiple queries are getting executed.
if (admitted_queries_.empty()) {
- LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
return;
}
@@ -86,7 +86,7 @@ void PolicyEnforcerDistributed::getWorkOrderMessages(
static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
if (next_work_order_message != nullptr) {
++messages_collected_curr_query;
- work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+ work_order_proto_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
} else {
// No more work ordes from the current query at this time.
// Check if the query's execution is over.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 16ebe36..bce3e0c 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -76,10 +76,10 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
* @brief Get work order messages to be dispatched. These messages come from
* the active queries.
*
- * @param work_order_messages The work order messages to be dispatched.
+ * @param work_order_proto_messages The work order messages to be dispatched.
**/
- void getWorkOrderMessages(
- std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+ void getWorkOrderProtoMessages(
+ std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_proto_messages);
/**
* @brief Process the initiate rebuild work order response message.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 33a93b0..bba67e3 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -63,8 +63,8 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
// We sort the following message types in the order of a life cycle of a query.
enum QueryExecutionMessageType : message_type_id {
- kAdmitRequestMessage, // Requesting a query (or queries) to be admitted, from
- // the main thread to Foreman.
+ kAdmitRequestMessage = 0, // Requesting a query (or queries) to be admitted, from
+ // the main thread to Foreman.
kWorkOrderMessage, // From Foreman to Worker.
kWorkOrderCompleteMessage, // From Worker to Foreman.
kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ddfd47f..5c2c5e0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -147,10 +147,11 @@ void Shiftboss::run() {
proto.relation_id());
break;
}
- case kWorkOrderCompleteMessage: // Fall through.
- case kRebuildWorkOrderCompleteMessage:
+ case kCatalogRelationNewBlockMessage: // Fall through.
case kDataPipelineMessage:
- case kWorkOrderFeedbackMessage: {
+ case kWorkOrderFeedbackMessage:
+ case kWorkOrderCompleteMessage:
+ case kRebuildWorkOrderCompleteMessage: {
DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') forwarded typed '" << annotated_message.tagged_message.message_type()
<< "' message from Worker with TMB client ID '" << annotated_message.sender
@@ -165,6 +166,15 @@ void Shiftboss::run() {
CHECK(send_status == MessageBus::SendStatus::kOK);
break;
}
+ case kQueryTeardownMessage: {
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+ serialization::QueryTeardownMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ query_contexts_.erase(proto.query_id());
+ break;
+ }
case kSaveQueryResultMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
@@ -175,8 +185,12 @@ void Shiftboss::run() {
storage_manager_->saveBlockOrBlob(proto.blocks(i));
}
+ // Clean up query execution states, i.e., QueryContext.
+ query_contexts_.erase(proto.query_id());
+
serialization::SaveQueryResultResponseMessage proto_response;
proto_response.set_relation_id(proto.relation_id());
+ proto_response.set_cli_id(proto.cli_id());
const size_t proto_response_length = proto_response.ByteSize();
char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 30a8d1a..94b10a2 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -21,6 +21,7 @@
#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
#include <cstddef>
+#include <cstdint>
#include <memory>
#include <unordered_map>
@@ -97,27 +98,34 @@ class Shiftboss : public Thread {
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+
// Message sent to Worker.
bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
- // Message sent to Foreman.
- bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
// Forward the following message types from Foreman to Workers.
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
// Forward the following message types from Workers to Foreman.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+ bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
- bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+ // Clean up query execution states, i.e., QueryContext.
+ bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
// Stop itself.
bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 1980980..0c00ff6 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -83,4 +83,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
-file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
\ No newline at end of file
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)