You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:28 UTC
[36/50] incubator-quickstep git commit: Scheduled HashJoinWorkOrder
on the same Shiftboss of BuildHashWorkOrder.
Scheduled HashJoinWorkOrder on the same Shiftboss of BuildHashWorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e75c265e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e75c265e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e75c265e
Branch: refs/heads/quickstep_partition_parser_support
Commit: e75c265eb279256d89f4fa634c542d566ce3cd7a
Parents: c4f4b28
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 13 20:20:56 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 27 16:45:22 2016 -0800
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 3 ++
query_execution/ForemanDistributed.cpp | 47 ++++++++++++++++++++--
query_execution/ForemanDistributed.hpp | 4 ++
query_execution/PolicyEnforcerDistributed.cpp | 12 ++++++
query_execution/PolicyEnforcerDistributed.hpp | 19 +++++++++
query_execution/QueryManagerDistributed.hpp | 25 ++++++++++++
6 files changed, 106 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 719d9f3..1f7add8 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -108,10 +108,12 @@ if (ENABLE_DISTRIBUTED)
quickstep_queryexecution_ForemanBase
quickstep_queryexecution_PolicyEnforcerBase
quickstep_queryexecution_PolicyEnforcerDistributed
+ quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_ShiftbossDirectory
+ quickstep_relationaloperators_WorkOrder_proto
quickstep_threading_ThreadUtil
quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
@@ -153,6 +155,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_catalog_CatalogRelation
quickstep_catalog_Catalog_proto
quickstep_queryexecution_PolicyEnforcerBase
+ quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionState
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 7dccce4..fc9cd3c 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -30,10 +30,12 @@
#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
+#include "relational_operators/WorkOrder.pb.h"
#include "threading/ThreadUtil.hpp"
#include "utility/EqualsAnyConstant.hpp"
@@ -241,16 +243,53 @@ bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id messag
kWorkOrderFeedbackMessage);
}
+bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto,
+ const size_t next_shiftboss_index_to_schedule,
+ size_t *shiftboss_index_for_hash_join) {
+ const S::WorkOrder &work_order_proto = proto.work_order();
+ QueryContext::join_hash_table_id join_hash_table_index;
+
+ switch (work_order_proto.work_order_type()) {
+ case S::BUILD_HASH:
+ join_hash_table_index = work_order_proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index);
+ break;
+ case S::DESTROY_HASH:
+ join_hash_table_index = work_order_proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index);
+ break;
+ case S::HASH_JOIN:
+ join_hash_table_index = work_order_proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index);
+ break;
+ default:
+ return false;
+ }
+
+ static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin(
+ proto.query_id(), join_hash_table_index, next_shiftboss_index_to_schedule, shiftboss_index_for_hash_join);
+
+ return true;
+}
+
void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
const size_t num_shiftbosses = shiftboss_directory_.size();
size_t shiftboss_index = 0u;
for (const auto &message : messages) {
DCHECK(message != nullptr);
- sendWorkOrderMessage(shiftboss_index, *message);
- shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index);
+ const S::WorkOrderMessage &proto = *message;
+ size_t shiftboss_index_for_hash_join;
+ if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_hash_join)) {
+ sendWorkOrderMessage(shiftboss_index_for_hash_join, proto);
+ shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_hash_join);
+
+ if (shiftboss_index == shiftboss_index_for_hash_join) {
+ shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+ }
+ } else {
+ sendWorkOrderMessage(shiftboss_index, proto);
+ shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index);
- // TO(zuyu): Take data-locality into account for scheduling.
- shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+ // TODO(zuyu): Take data-locality into account for scheduling.
+ shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index ccdd0ae..0616f30 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -71,6 +71,10 @@ class ForemanDistributed final : public ForemanBase {
void run() override;
private:
+ bool isHashJoinRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index_for_hash_join);
+
/**
* @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
* worker threads.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6e09ea8..86b36c8 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -158,6 +158,18 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
}
}
+void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
+ const std::size_t query_id,
+ const QueryContext::join_hash_table_id join_hash_table_index,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+ query_manager->getShiftbossIndexForHashJoin(join_hash_table_index,
+ next_shiftboss_index_to_schedule,
+ shiftboss_index);
+}
+
void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
S::QueryInitiateMessage proto;
proto.set_query_id(query_handle->query_id());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 146e9af..37326bd 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -20,6 +20,7 @@
#include <vector>
#include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/ShiftbossDirectory.hpp"
#include "utility/Macros.hpp"
@@ -88,6 +89,24 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
**/
void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
+ /**
+ * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
+ * If it is the first BuildHash on <join_hash_table_index>, <shiftboss_index>
+ * will be set to <next_shiftboss_index_to_schedule>. Otherwise,
+ * <shiftboss_index> will be set to the index of the Shiftboss that has
+ * executed the first BuildHash.
+ *
+ * @param query_id The query id.
+ * @param join_hash_table_index The Hash Table for the Join.
+ * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+ * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+ **/
+ void getShiftbossIndexForHashJoin(
+ const std::size_t query_id,
+ const QueryContext::join_hash_table_id join_hash_table_index,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index);
+
private:
void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override {
shiftboss_directory_->decrementNumQueuedWorkOrders(proto.shiftboss_index());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index f8ac53c..2b21303 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -22,7 +22,9 @@
#include <cstddef>
#include <memory>
+#include <unordered_map>
+#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionState.hpp"
#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
@@ -92,6 +94,26 @@ class QueryManagerDistributed final : public QueryManagerBase {
serialization::WorkOrderMessage* getNextWorkOrderMessage(
const dag_node_index start_operator_index);
+ /**
+ * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
+ * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+ *
+ * @param join_hash_table_index The Hash Table for the Join.
+ * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+ * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+ **/
+ void getShiftbossIndexForHashJoin(const QueryContext::join_hash_table_id join_hash_table_index,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index) {
+ const auto cit = shiftboss_indexes_for_hash_joins_.find(join_hash_table_index);
+ if (cit != shiftboss_indexes_for_hash_joins_.end()) {
+ *shiftboss_index = cit->second;
+ } else {
+ shiftboss_indexes_for_hash_joins_.emplace(join_hash_table_index, next_shiftboss_index_to_schedule);
+ *shiftboss_index = next_shiftboss_index_to_schedule;
+ }
+ }
+
private:
bool checkNormalExecutionOver(const dag_node_index index) const override {
return (checkAllDependenciesMet(index) &&
@@ -114,6 +136,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
+ // A map from a join hash table to its scheduled Shiftboss index.
+ std::unordered_map<QueryContext::join_hash_table_id, std::size_t> shiftboss_indexes_for_hash_joins_;
+
DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
};