You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/15 23:35:36 UTC

incubator-quickstep git commit: Added the execution support for LIP with partitions.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 5fbfd2111 -> 1c749c911


Added the execution support for LIP with partitions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1c749c91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1c749c91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1c749c91

Branch: refs/heads/master
Commit: 1c749c911814926c48b8c4f7f34519605dc435a2
Parents: 5fbfd21
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Jun 15 17:38:53 2017 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Jun 15 17:38:53 2017 -0500

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        |  5 ++++-
 query_execution/PolicyEnforcerDistributed.cpp |  2 ++
 query_execution/PolicyEnforcerDistributed.hpp |  2 ++
 query_execution/QueryContext.proto            |  2 ++
 query_execution/QueryManagerDistributed.cpp   |  6 ++++++
 query_execution/QueryManagerDistributed.hpp   | 24 ++++++++++++++++------
 6 files changed, 34 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 49f2101..942f383 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -329,6 +329,7 @@ bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto,
                                                size_t *shiftboss_index_for_lip) {
   const S::WorkOrder &work_order_proto = proto.work_order();
   vector<QueryContext::lip_filter_id> lip_filter_indexes;
+  partition_id part_id;
   block_id block = kInvalidBlockId;
 
   switch (work_order_proto.work_order_type()) {
@@ -336,12 +337,14 @@ bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto,
       for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildLIPFilterWorkOrder::lip_filter_indexes); ++i) {
         lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::lip_filter_indexes, i));
       }
+      part_id = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::partition_id);
       block = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::build_block_id);
       break;
     case S::SELECT:
       for (int i = 0; i < work_order_proto.ExtensionSize(S::SelectWorkOrder::lip_filter_indexes); ++i) {
         lip_filter_indexes.push_back(work_order_proto.GetExtension(S::SelectWorkOrder::lip_filter_indexes, i));
       }
+      part_id = work_order_proto.GetExtension(S::SelectWorkOrder::partition_id);
       block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
       break;
     default:
@@ -349,7 +352,7 @@ bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto,
   }
 
   static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForLip(
-      proto.query_id(), lip_filter_indexes, block_locator_, block, next_shiftboss_index_to_schedule,
+      proto.query_id(), lip_filter_indexes, part_id, block_locator_, block, next_shiftboss_index_to_schedule,
       shiftboss_index_for_lip);
 
   return true;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 766c351..2bd25fa 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -232,6 +232,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
 void PolicyEnforcerDistributed::getShiftbossIndexForLip(
     const std::size_t query_id,
     const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+    const partition_id part_id,
     const BlockLocator &block_locator,
     const block_id block,
     const std::size_t next_shiftboss_index_to_schedule,
@@ -239,6 +240,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForLip(
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
   query_manager->getShiftbossIndexForLip(lip_filter_indexes,
+                                         part_id,
                                          block_locator,
                                          block,
                                          next_shiftboss_index_to_schedule,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 23b0017..25a8ce0 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -187,6 +187,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    *
    * @param query_id The query id.
    * @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
+   * @param part_id The partition ID.
    * @param block_locator The BlockLocator to use.
    * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
@@ -195,6 +196,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   void getShiftbossIndexForLip(
       const std::size_t query_id,
       const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+      const partition_id part_id,
       const BlockLocator &block_locator,
       const block_id block,
       const std::size_t next_shiftboss_index_to_schedule,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index b76374c..6ad51b5 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -74,4 +74,6 @@ message QueryContext {
   required uint64 query_id = 13;
 
   repeated uint64 num_partitions_for_nested_loops_joins = 14;
+
+  optional uint64 num_partitions_for_lips = 15 [default = 1];
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 77a605e..a248391 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -92,6 +92,12 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
   }
 
   computeLipFilterEquivalenceClasses(query_context_proto);
+
+  const size_t num_partitions_for_lips = query_context_proto.num_partitions_for_lips();
+  for (const LipFilterGroupIndex lip_filter_groups_index : lip_filter_groups_indexes_) {
+    shiftboss_indexes_for_lip_filter_groups_.emplace(lip_filter_groups_index,
+                                                     vector<size_t>(num_partitions_for_lips, kInvalidShiftbossIndex));
+  }
 }
 
 void QueryManagerDistributed::computeLipFilterEquivalenceClasses(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 6490eb7..a021fdd 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -127,7 +127,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
       return;
     }
 
-    getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
+    getShiftbossIndexForLip(lip_filter_indexes, part_id, block_locator, block, next_shiftboss_index_to_schedule,
                             shiftboss_index);
 
     shiftboss_indexes_for_aggrs_[aggr_state_index][part_id] = *shiftboss_index;
@@ -161,7 +161,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
       return;
     }
 
-    getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
+    getShiftbossIndexForLip(lip_filter_indexes, part_id, block_locator, block, next_shiftboss_index_to_schedule,
                             shiftboss_index);
 
     shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = *shiftboss_index;
@@ -173,28 +173,40 @@ class QueryManagerDistributed final : public QueryManagerBase {
    * otherwise <next_shiftboss_index_to_schedule>.
    *
    * @param lip_filter_indexes The LIP filter indexes.
+   * @param part_id The partition ID.
    * @param block_locator The BlockLocator to use.
    * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
   void getShiftbossIndexForLip(const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+                               const partition_id part_id,
                                const BlockLocator &block_locator,
                                const block_id block,
                                const std::size_t next_shiftboss_index_to_schedule,
                                std::size_t *shiftboss_index) {
+#ifdef QUICKSTEP_DEBUG
+    if (!lip_filter_indexes.empty()) {
+      DCHECK_LT(lip_filter_indexes.front(), lip_filter_groups_indexes_.size());
+      const auto cit =
+          shiftboss_indexes_for_lip_filter_groups_.find(lip_filter_groups_indexes_[lip_filter_indexes.front()]);
+      DCHECK(cit != shiftboss_indexes_for_lip_filter_groups_.end());
+      DCHECK_LT(part_id, cit->second.size());
+    }
+#endif
+
     if (!lip_filter_indexes.empty() &&
-        shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]]
+        shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]][part_id]
             != kInvalidShiftbossIndex) {
       *shiftboss_index =
-          shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]];
+          shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]][part_id];
       return;
     } else if (!block_locator.getBlockLocalityInfo(block, shiftboss_index)) {
       *shiftboss_index = next_shiftboss_index_to_schedule;
     }
 
     if (!lip_filter_indexes.empty()) {
-      shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]] =
+      shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]][part_id] =
           *shiftboss_index;
     }
   }
@@ -282,7 +294,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
   std::vector<LipFilterGroupIndex> lip_filter_groups_indexes_;
 
   // From a LipFilterGroupIndex to its scheduled Shiftboss index.
-  std::unordered_map<LipFilterGroupIndex, std::size_t> shiftboss_indexes_for_lip_filter_groups_;
+  std::unordered_map<LipFilterGroupIndex, std::vector<std::size_t>> shiftboss_indexes_for_lip_filter_groups_;
 
   DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
 };