You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:29 UTC
[37/50] incubator-quickstep git commit: Scheduled WorkOrders w/ the
same aggr_state_index on the same Shiftboss.
Scheduled WorkOrders w/ the same aggr_state_index on the same Shiftboss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/57730e40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/57730e40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/57730e40
Branch: refs/heads/quickstep_partition_parser_support
Commit: 57730e406533fa02280568380c853ce1c5dc19ee
Parents: e75c265
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 27 17:04:30 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 27 17:28:30 2016 -0800
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 61 ++++++++++++++++------
query_execution/ForemanDistributed.hpp | 4 ++
query_execution/PolicyEnforcerDistributed.cpp | 12 +++++
query_execution/PolicyEnforcerDistributed.hpp | 18 +++++++
query_execution/QueryManagerDistributed.hpp | 23 ++++++++
5 files changed, 102 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index fc9cd3c..61f0603 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,6 +243,32 @@ bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id messag
kWorkOrderFeedbackMessage);
}
+bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto,
+ const size_t next_shiftboss_index_to_schedule,
+ size_t *shiftboss_index_for_aggregation) {
+ const S::WorkOrder &work_order_proto = proto.work_order();
+ QueryContext::aggregation_state_id aggr_state_index;
+
+ switch (work_order_proto.work_order_type()) {
+ case S::AGGREGATION:
+ aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+ break;
+ case S::FINALIZE_AGGREGATION:
+ aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);
+ break;
+ case S::DESTROY_AGGREGATION_STATE:
+ aggr_state_index = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::aggr_state_index);
+ break;
+ default:
+ return false;
+ }
+
+ static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
+ proto.query_id(), aggr_state_index, next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
+
+ return true;
+}
+
bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto,
const size_t next_shiftboss_index_to_schedule,
size_t *shiftboss_index_for_hash_join) {
@@ -251,13 +277,13 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
switch (work_order_proto.work_order_type()) {
case S::BUILD_HASH:
- join_hash_table_index = work_order_proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index);
- break;
- case S::DESTROY_HASH:
- join_hash_table_index = work_order_proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index);
+ join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index);
break;
case S::HASH_JOIN:
- join_hash_table_index = work_order_proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index);
+ join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index);
+ break;
+ case S::DESTROY_HASH:
+ join_hash_table_index = work_order_proto.GetExtension(S::DestroyHashWorkOrder::join_hash_table_index);
break;
default:
return false;
@@ -275,20 +301,23 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
for (const auto &message : messages) {
DCHECK(message != nullptr);
const S::WorkOrderMessage &proto = *message;
- size_t shiftboss_index_for_hash_join;
- if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_hash_join)) {
- sendWorkOrderMessage(shiftboss_index_for_hash_join, proto);
- shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_hash_join);
-
- if (shiftboss_index == shiftboss_index_for_hash_join) {
- shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
- }
+ size_t shiftboss_index_for_particular_work_order_type;
+ if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+ } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else {
- sendWorkOrderMessage(shiftboss_index, proto);
- shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index);
-
// TODO(zuyu): Take data-locality into account for scheduling.
+ shiftboss_index_for_particular_work_order_type = shiftboss_index;
+ }
+
+ sendWorkOrderMessage(shiftboss_index_for_particular_work_order_type, proto);
+ shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
+
+ if (shiftboss_index == shiftboss_index_for_particular_work_order_type) {
shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+ } else {
+ // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
+ // <shiftboss_index_for_particular_work_order_type> might be scheduled one
+ // more WorkOrder for an Aggregation or a HashJoin.
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 0616f30..34bac07 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -71,6 +71,10 @@ class ForemanDistributed final : public ForemanBase {
void run() override;
private:
+ bool isAggregationRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index_for_aggregation);
+
bool isHashJoinRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index_for_hash_join);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 86b36c8..c5642bc 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -158,6 +158,18 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
}
}
+void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
+ const std::size_t query_id,
+ const QueryContext::aggregation_state_id aggr_state_index,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+ query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+ next_shiftboss_index_to_schedule,
+ shiftboss_index);
+}
+
void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
const std::size_t query_id,
const QueryContext::join_hash_table_id join_hash_table_index,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 37326bd..e8bc394 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -90,6 +90,24 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
/**
+ * @brief Get or set the index of Shiftboss for an Aggregation related
+ * WorkOrder. If it is the first Aggregation on <aggr_state_index>,
+ * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
+ * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
+ * has executed the first Aggregation.
+ *
+ * @param query_id The query id.
+ * @param aggr_state_index The Hash Table for the Aggregation.
+ * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+ * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+ **/
+ void getShiftbossIndexForAggregation(
+ const std::size_t query_id,
+ const QueryContext::aggregation_state_id aggr_state_index,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index);
+
+ /**
* @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
* If it is the first BuildHash on <join_hash_table_index>, <shiftboss_index>
* will be set to <next_shiftboss_index_to_schedule>. Otherwise,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 2b21303..7a07fcb 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -95,6 +95,26 @@ class QueryManagerDistributed final : public QueryManagerBase {
const dag_node_index start_operator_index);
/**
+ * @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If
+ * the Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+ *
+ * @param aggr_state_index The Hash Table for the Aggregation.
+ * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+ * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+ **/
+ void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index) {
+ const auto cit = shiftboss_indexes_for_aggrs_.find(aggr_state_index);
+ if (cit != shiftboss_indexes_for_aggrs_.end()) {
+ *shiftboss_index = cit->second;
+ } else {
+ shiftboss_indexes_for_aggrs_.emplace(aggr_state_index, next_shiftboss_index_to_schedule);
+ *shiftboss_index = next_shiftboss_index_to_schedule;
+ }
+ }
+
+ /**
* @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
* Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
*
@@ -136,6 +156,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
+ // A map from an aggregation id to its scheduled Shiftboss index.
+ std::unordered_map<QueryContext::aggregation_state_id, std::size_t> shiftboss_indexes_for_aggrs_;
+
// A map from a join hash table to its scheduled Shiftboss index.
std::unordered_map<QueryContext::join_hash_table_id, std::size_t> shiftboss_indexes_for_hash_joins_;