You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:29 UTC

[37/50] incubator-quickstep git commit: Scheduled WorkOrders w/ the same aggr_state_index on the same Shiftboss.

Scheduled WorkOrders w/ the same aggr_state_index on the same Shiftboss.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/57730e40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/57730e40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/57730e40

Branch: refs/heads/quickstep_partition_parser_support
Commit: 57730e406533fa02280568380c853ce1c5dc19ee
Parents: e75c265
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 27 17:04:30 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 27 17:28:30 2016 -0800

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        | 61 ++++++++++++++++------
 query_execution/ForemanDistributed.hpp        |  4 ++
 query_execution/PolicyEnforcerDistributed.cpp | 12 +++++
 query_execution/PolicyEnforcerDistributed.hpp | 18 +++++++
 query_execution/QueryManagerDistributed.hpp   | 23 ++++++++
 5 files changed, 102 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index fc9cd3c..61f0603 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,6 +243,32 @@ bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id messag
                                         kWorkOrderFeedbackMessage);
 }
 
+bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto,
+                                                       const size_t next_shiftboss_index_to_schedule,
+                                                       size_t *shiftboss_index_for_aggregation) {
+  const S::WorkOrder &work_order_proto = proto.work_order();
+  QueryContext::aggregation_state_id aggr_state_index;
+
+  switch (work_order_proto.work_order_type()) {
+    case S::AGGREGATION:
+      aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+      break;
+    case S::FINALIZE_AGGREGATION:
+      aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);
+      break;
+    case S::DESTROY_AGGREGATION_STATE:
+      aggr_state_index = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::aggr_state_index);
+      break;
+    default:
+      return false;
+  }
+
+  static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
+      proto.query_id(), aggr_state_index, next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
+
+  return true;
+}
+
 bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto,
                                                     const size_t next_shiftboss_index_to_schedule,
                                                     size_t *shiftboss_index_for_hash_join) {
@@ -251,13 +277,13 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
 
   switch (work_order_proto.work_order_type()) {
     case S::BUILD_HASH:
-      join_hash_table_index = work_order_proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index);
-      break;
-    case S::DESTROY_HASH:
-      join_hash_table_index = work_order_proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index);
+      join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index);
       break;
     case S::HASH_JOIN:
-      join_hash_table_index = work_order_proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index);
+      join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index);
+      break;
+    case S::DESTROY_HASH:
+      join_hash_table_index = work_order_proto.GetExtension(S::DestroyHashWorkOrder::join_hash_table_index);
       break;
     default:
       return false;
@@ -275,20 +301,23 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
   for (const auto &message : messages) {
     DCHECK(message != nullptr);
     const S::WorkOrderMessage &proto = *message;
-    size_t shiftboss_index_for_hash_join;
-    if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_hash_join)) {
-      sendWorkOrderMessage(shiftboss_index_for_hash_join, proto);
-      shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_hash_join);
-
-      if (shiftboss_index == shiftboss_index_for_hash_join) {
-        shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
-      }
+    size_t shiftboss_index_for_particular_work_order_type;
+    if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+    } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else {
-      sendWorkOrderMessage(shiftboss_index, proto);
-      shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index);
-
       // TODO(zuyu): Take data-locality into account for scheduling.
+      shiftboss_index_for_particular_work_order_type = shiftboss_index;
+    }
+
+    sendWorkOrderMessage(shiftboss_index_for_particular_work_order_type, proto);
+    shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
+
+    if (shiftboss_index == shiftboss_index_for_particular_work_order_type) {
       shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+    } else {
+      // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
+      // <shiftboss_index_for_particular_work_order_type> might be scheduled one
+      // more WorkOrder for an Aggregation or a HashJoin.
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 0616f30..34bac07 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -71,6 +71,10 @@ class ForemanDistributed final : public ForemanBase {
   void run() override;
 
  private:
+  bool isAggregationRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
+                                     const std::size_t next_shiftboss_index_to_schedule,
+                                     std::size_t *shiftboss_index_for_aggregation);
+
   bool isHashJoinRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
                                   const std::size_t next_shiftboss_index_to_schedule,
                                   std::size_t *shiftboss_index_for_hash_join);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 86b36c8..c5642bc 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -158,6 +158,18 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
   }
 }
 
+void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
+    const std::size_t query_id,
+    const QueryContext::aggregation_state_id aggr_state_index,
+    const std::size_t next_shiftboss_index_to_schedule,
+    std::size_t *shiftboss_index) {
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+  query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+                                                 next_shiftboss_index_to_schedule,
+                                                 shiftboss_index);
+}
+
 void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
     const std::size_t query_id,
     const QueryContext::join_hash_table_id join_hash_table_index,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 37326bd..e8bc394 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -90,6 +90,24 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
 
   /**
+   * @brief Get or set the index of Shiftboss for an Aggregation related
+   * WorkOrder. If it is the first Aggregation on <aggr_state_index>,
+   * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
+   * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
+   * has executed the first Aggregation.
+   *
+   * @param query_id The query id.
+   * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+   * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+   **/
+  void getShiftbossIndexForAggregation(
+      const std::size_t query_id,
+      const QueryContext::aggregation_state_id aggr_state_index,
+      const std::size_t next_shiftboss_index_to_schedule,
+      std::size_t *shiftboss_index);
+
+  /**
    * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
    * If it is the first BuildHash on <join_hash_table_index>, <shiftboss_index>
    * will be set to <next_shiftboss_index_to_schedule>. Otherwise,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 2b21303..7a07fcb 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -95,6 +95,26 @@ class QueryManagerDistributed final : public QueryManagerBase {
       const dag_node_index start_operator_index);
 
   /**
+   * @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If
+   * the Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+   *
+   * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+   * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+   **/
+  void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+                                       const std::size_t next_shiftboss_index_to_schedule,
+                                       std::size_t *shiftboss_index) {
+    const auto cit = shiftboss_indexes_for_aggrs_.find(aggr_state_index);
+    if (cit != shiftboss_indexes_for_aggrs_.end()) {
+      *shiftboss_index = cit->second;
+    } else {
+      shiftboss_indexes_for_aggrs_.emplace(aggr_state_index, next_shiftboss_index_to_schedule);
+      *shiftboss_index = next_shiftboss_index_to_schedule;
+    }
+  }
+
+  /**
    * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
    * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
    *
@@ -136,6 +156,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
 
+  // A map from an aggregation id to its scheduled Shiftboss index.
+  std::unordered_map<QueryContext::aggregation_state_id, std::size_t> shiftboss_indexes_for_aggrs_;
+
   // A map from a join hash table to its scheduled Shiftboss index.
   std::unordered_map<QueryContext::join_hash_table_id, std::size_t> shiftboss_indexes_for_hash_joins_;