You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:28 UTC

[36/50] incubator-quickstep git commit: Scheduled HashJoinWorkOrder on the same Shiftboss of BuildHashWorkOrder.

Scheduled HashJoinWorkOrder on the same Shiftboss of BuildHashWorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e75c265e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e75c265e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e75c265e

Branch: refs/heads/quickstep_partition_parser_support
Commit: e75c265eb279256d89f4fa634c542d566ce3cd7a
Parents: c4f4b28
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 13 20:20:56 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 27 16:45:22 2016 -0800

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  3 ++
 query_execution/ForemanDistributed.cpp        | 47 ++++++++++++++++++++--
 query_execution/ForemanDistributed.hpp        |  4 ++
 query_execution/PolicyEnforcerDistributed.cpp | 12 ++++++
 query_execution/PolicyEnforcerDistributed.hpp | 19 +++++++++
 query_execution/QueryManagerDistributed.hpp   | 25 ++++++++++++
 6 files changed, 106 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 719d9f3..1f7add8 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -108,10 +108,12 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_ForemanBase
                         quickstep_queryexecution_PolicyEnforcerBase
                         quickstep_queryexecution_PolicyEnforcerDistributed
+                        quickstep_queryexecution_QueryContext
                         quickstep_queryexecution_QueryExecutionMessages_proto
                         quickstep_queryexecution_QueryExecutionTypedefs
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_relationaloperators_WorkOrder_proto
                         quickstep_threading_ThreadUtil
                         quickstep_utility_EqualsAnyConstant
                         quickstep_utility_Macros
@@ -153,6 +155,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_catalog_CatalogRelation
                         quickstep_catalog_Catalog_proto
                         quickstep_queryexecution_PolicyEnforcerBase
+                        quickstep_queryexecution_QueryContext
                         quickstep_queryexecution_QueryContext_proto
                         quickstep_queryexecution_QueryExecutionMessages_proto
                         quickstep_queryexecution_QueryExecutionState

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 7dccce4..fc9cd3c 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -30,10 +30,12 @@
 #include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "threading/ThreadUtil.hpp"
 #include "utility/EqualsAnyConstant.hpp"
 
@@ -241,16 +243,53 @@ bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id messag
                                         kWorkOrderFeedbackMessage);
 }
 
+bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto,
+                                                    const size_t next_shiftboss_index_to_schedule,
+                                                    size_t *shiftboss_index_for_hash_join) {
+  const S::WorkOrder &work_order_proto = proto.work_order();
+  QueryContext::join_hash_table_id join_hash_table_index;
+
+  switch (work_order_proto.work_order_type()) {
+    case S::BUILD_HASH:
+      join_hash_table_index = work_order_proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index);
+      break;
+    case S::DESTROY_HASH:
+      join_hash_table_index = work_order_proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index);
+      break;
+    case S::HASH_JOIN:
+      join_hash_table_index = work_order_proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index);
+      break;
+    default:
+      return false;
+  }
+
+  static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin(
+      proto.query_id(), join_hash_table_index, next_shiftboss_index_to_schedule, shiftboss_index_for_hash_join);
+
+  return true;
+}
+
 void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
   const size_t num_shiftbosses = shiftboss_directory_.size();
   size_t shiftboss_index = 0u;
   for (const auto &message : messages) {
     DCHECK(message != nullptr);
-    sendWorkOrderMessage(shiftboss_index, *message);
-    shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index);
+    const S::WorkOrderMessage &proto = *message;
+    size_t shiftboss_index_for_hash_join;
+    if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_hash_join)) {
+      sendWorkOrderMessage(shiftboss_index_for_hash_join, proto);
+      shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_hash_join);
+
+      if (shiftboss_index == shiftboss_index_for_hash_join) {
+        shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+      }
+    } else {
+      sendWorkOrderMessage(shiftboss_index, proto);
+      shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index);
 
-    // TO(zuyu): Take data-locality into account for scheduling.
-    shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+      // TODO(zuyu): Take data-locality into account for scheduling.
+      shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index ccdd0ae..0616f30 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -71,6 +71,10 @@ class ForemanDistributed final : public ForemanBase {
   void run() override;
 
  private:
+  bool isHashJoinRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
+                                  const std::size_t next_shiftboss_index_to_schedule,
+                                  std::size_t *shiftboss_index_for_hash_join);
+
   /**
    * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
    *        worker threads.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6e09ea8..86b36c8 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -158,6 +158,18 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
   }
 }
 
+void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
+    const std::size_t query_id,
+    const QueryContext::join_hash_table_id join_hash_table_index,
+    const std::size_t next_shiftboss_index_to_schedule,
+    std::size_t *shiftboss_index) {
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+  query_manager->getShiftbossIndexForHashJoin(join_hash_table_index,
+                                              next_shiftboss_index_to_schedule,
+                                              shiftboss_index);
+}
+
 void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
   S::QueryInitiateMessage proto;
   proto.set_query_id(query_handle->query_id());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 146e9af..37326bd 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -20,6 +20,7 @@
 #include <vector>
 
 #include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "utility/Macros.hpp"
@@ -88,6 +89,24 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    **/
   void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
 
+  /**
+   * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
+   * If it is the first BuildHash on <join_hash_table_index>, <shiftboss_index>
+   * will be set to <next_shiftboss_index_to_schedule>. Otherwise,
+   * <shiftboss_index> will be set to the index of the Shiftboss that has
+   * executed the first BuildHash.
+   *
+   * @param query_id The query id.
+   * @param join_hash_table_index The Hash Table for the Join.
+   * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+   * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+   **/
+  void getShiftbossIndexForHashJoin(
+      const std::size_t query_id,
+      const QueryContext::join_hash_table_id join_hash_table_index,
+      const std::size_t next_shiftboss_index_to_schedule,
+      std::size_t *shiftboss_index);
+
  private:
   void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override {
     shiftboss_directory_->decrementNumQueuedWorkOrders(proto.shiftboss_index());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index f8ac53c..2b21303 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -22,7 +22,9 @@
 
 #include <cstddef>
 #include <memory>
+#include <unordered_map>
 
+#include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionState.hpp"
 #include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
@@ -92,6 +94,26 @@ class QueryManagerDistributed final : public QueryManagerBase {
   serialization::WorkOrderMessage* getNextWorkOrderMessage(
       const dag_node_index start_operator_index);
 
+  /**
+   * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
+   * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+   *
+   * @param join_hash_table_index The Hash Table for the Join.
+   * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+   * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+   **/
+  void getShiftbossIndexForHashJoin(const QueryContext::join_hash_table_id join_hash_table_index,
+                                    const std::size_t next_shiftboss_index_to_schedule,
+                                    std::size_t *shiftboss_index) {
+    const auto cit = shiftboss_indexes_for_hash_joins_.find(join_hash_table_index);
+    if (cit != shiftboss_indexes_for_hash_joins_.end()) {
+      *shiftboss_index = cit->second;
+    } else {
+      shiftboss_indexes_for_hash_joins_.emplace(join_hash_table_index, next_shiftboss_index_to_schedule);
+      *shiftboss_index = next_shiftboss_index_to_schedule;
+    }
+  }
+
  private:
   bool checkNormalExecutionOver(const dag_node_index index) const override {
     return (checkAllDependenciesMet(index) &&
@@ -114,6 +136,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
 
+  // A map from a join hash table to its scheduled Shiftboss index.
+  std::unordered_map<QueryContext::join_hash_table_id, std::size_t> shiftboss_indexes_for_hash_joins_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
 };