You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/14 23:51:19 UTC

incubator-quickstep git commit: Added operator support for BuildLipFilter with partitioned inputs.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 0f549acb5 -> 7c5d76223


Added operator support for BuildLipFilter with partitioned inputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7c5d7622
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7c5d7622
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7c5d7622

Branch: refs/heads/master
Commit: 7c5d7622332c6684948daa2bc8de7f38488521f7
Parents: 0f549ac
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Jun 14 15:22:13 2017 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 16:10:20 2017 -0500

----------------------------------------------------------------------
 query_optimizer/ExecutionGenerator.cpp          | 11 +++-
 relational_operators/BuildLIPFilterOperator.cpp | 64 ++++++++++++--------
 relational_operators/BuildLIPFilterOperator.hpp | 37 ++++++++---
 relational_operators/CMakeLists.txt             |  2 +
 relational_operators/WorkOrder.proto            |  2 +
 5 files changed, 82 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index acc2bf1..d3870df 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -663,6 +663,14 @@ void ExecutionGenerator::convertFilterJoin(const P::FilterJoinPtr &physical_plan
   const CatalogRelationInfo *build_relation_info =
       findRelationInfoOutputByPhysical(build_physical);
 
+  const CatalogRelation &build_relation = *build_relation_info->relation;
+  const PartitionScheme *build_partition_scheme = build_relation.getPartitionScheme();
+
+  const std::size_t build_num_partitions =
+      build_partition_scheme
+          ? build_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
+          : 1u;
+
   // Create a BuildLIPFilterOperator for the FilterJoin. This operator builds
   // LIP filters that are applied properly in downstream operators to achieve
   // the filter-join semantics.
@@ -670,7 +678,8 @@ void ExecutionGenerator::convertFilterJoin(const P::FilterJoinPtr &physical_plan
       execution_plan_->addRelationalOperator(
           new BuildLIPFilterOperator(
               query_handle_->query_id(),
-              *build_relation_info->relation,
+              build_relation,
+              build_num_partitions,
               build_side_predicate_index,
               build_relation_info->isStoredRelation()));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/relational_operators/BuildLIPFilterOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp
index e7e549c..925dfb5 100644
--- a/relational_operators/BuildLIPFilterOperator.cpp
+++ b/relational_operators/BuildLIPFilterOperator.cpp
@@ -55,8 +55,12 @@ bool BuildLIPFilterOperator::getAllWorkOrders(
       query_context->getPredicate(build_side_predicate_index_);
 
   if (input_relation_is_stored_) {
-    if (!started_) {
-      for (const block_id input_block_id : input_relation_block_ids_) {
+    if (started_) {
+      return true;
+    }
+
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
         container->addNormalWorkOrder(
             new BuildLIPFilterWorkOrder(
                 query_id_,
@@ -68,22 +72,24 @@ bool BuildLIPFilterOperator::getAllWorkOrders(
                 CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
             op_index_);
       }
-      started_ = true;
     }
+    started_ = true;
     return true;
   } else {
-    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
-      container->addNormalWorkOrder(
-          new BuildLIPFilterWorkOrder(
-              query_id_,
-              input_relation_,
-              input_relation_block_ids_[num_workorders_generated_],
-              build_side_predicate,
-              storage_manager,
-              CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
-              CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
-          op_index_);
-      ++num_workorders_generated_;
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+        container->addNormalWorkOrder(
+            new BuildLIPFilterWorkOrder(
+                query_id_,
+                input_relation_,
+                input_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+                build_side_predicate,
+                storage_manager,
+                CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
+                CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
+            op_index_);
+        ++num_workorders_generated_[part_id];
+      }
     }
     return done_feeding_input_relation_;
   }
@@ -91,30 +97,38 @@ bool BuildLIPFilterOperator::getAllWorkOrders(
 
 bool BuildLIPFilterOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   if (input_relation_is_stored_) {
-    if (!started_) {
-      for (const block_id block : input_relation_block_ids_) {
-        container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
+    if (started_) {
+      return true;
+    }
+
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      for (const block_id block : input_relation_block_ids_[part_id]) {
+        container->addWorkOrderProto(createWorkOrderProto(part_id, block), op_index_);
       }
-      started_ = true;
     }
+    started_ = true;
     return true;
   } else {
-    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
-      container->addWorkOrderProto(
-          createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
-          op_index_);
-      ++num_workorders_generated_;
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+        container->addWorkOrderProto(
+            createWorkOrderProto(part_id, input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]),
+            op_index_);
+        ++num_workorders_generated_[part_id];
+      }
     }
     return done_feeding_input_relation_;
   }
 }
 
-serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const block_id block) {
+serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const partition_id part_id,
+                                                                       const block_id block) {
   serialization::WorkOrder *proto = new serialization::WorkOrder;
   proto->set_work_order_type(serialization::BUILD_LIP_FILTER);
   proto->set_query_id(query_id_);
 
   proto->SetExtension(serialization::BuildLIPFilterWorkOrder::relation_id, input_relation_.getID());
+  proto->SetExtension(serialization::BuildLIPFilterWorkOrder::partition_id, part_id);
   proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id, block);
   proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index,
                       build_side_predicate_index_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/relational_operators/BuildLIPFilterOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp
index 68bd307..df722ef 100644
--- a/relational_operators/BuildLIPFilterOperator.hpp
+++ b/relational_operators/BuildLIPFilterOperator.hpp
@@ -27,6 +27,8 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -68,6 +70,8 @@ class BuildLIPFilterOperator : public RelationalOperator {
    *
    * @param query_id The ID of the query to which this operator belongs.
    * @param input_relation The relation to build LIP filters on.
+   * @param num_partitions The number of partitions in 'input_relation'.
+   *        If no partitions, it is one.
    * @param build_side_predicate_index The index of the predicate in QueryContext
    *        where the predicate is to be applied to the input relation before
    *        building the LIP filters (or kInvalidPredicateId if no predicate is
@@ -78,16 +82,30 @@ class BuildLIPFilterOperator : public RelationalOperator {
    **/
   BuildLIPFilterOperator(const std::size_t query_id,
                          const CatalogRelation &input_relation,
+                         const std::size_t num_partitions,
                          const QueryContext::predicate_id build_side_predicate_index,
                          const bool input_relation_is_stored)
     : RelationalOperator(query_id),
       input_relation_(input_relation),
+      num_partitions_(num_partitions),
       build_side_predicate_index_(build_side_predicate_index),
       input_relation_is_stored_(input_relation_is_stored),
-      input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
-                                                         : std::vector<block_id>()),
-      num_workorders_generated_(0),
-      started_(false) {}
+      input_relation_block_ids_(num_partitions),
+      num_workorders_generated_(num_partitions),
+      started_(false) {
+    if (input_relation_is_stored) {
+      if (input_relation.hasPartitionScheme()) {
+        const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
+        DCHECK_EQ(part_scheme.getPartitionSchemeHeader().getNumPartitions(), num_partitions_);
+        for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+          input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+        }
+      } else {
+        // No partitions.
+        input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
+      }
+    }
+  }
 
   ~BuildLIPFilterOperator() override {}
 
@@ -117,23 +135,26 @@ class BuildLIPFilterOperator : public RelationalOperator {
   void feedInputBlock(const block_id input_block_id,
                       const relation_id input_relation_id,
                       const partition_id part_id) override {
-    input_relation_block_ids_.push_back(input_block_id);
+    input_relation_block_ids_[part_id].push_back(input_block_id);
   }
 
  private:
   /**
    * @brief Create Work Order proto.
    *
+   * @param part_id The partition id of 'input_relation_'.
    * @param block The block id used in the Work Order.
    **/
-  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+  serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, const block_id block);
 
   const CatalogRelation &input_relation_;
+  const std::size_t num_partitions_;
   const QueryContext::predicate_id build_side_predicate_index_;
   const bool input_relation_is_stored_;
 
-  std::vector<block_id> input_relation_block_ids_;
-  std::vector<block_id>::size_type num_workorders_generated_;
+  // The index is the partition id.
+  std::vector<BlocksInPartition> input_relation_block_ids_;
+  std::vector<std::size_t> num_workorders_generated_;
 
   bool started_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index a6a3cd0..79e0dc7 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -150,6 +150,8 @@ target_link_libraries(quickstep_relationaloperators_BuildLIPFilterOperator
                       glog
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionScheme
+                      quickstep_catalog_PartitionSchemeHeader
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrderProtosContainer
                       quickstep_queryexecution_WorkOrdersContainer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index bac2eb0..c6715c7 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -92,10 +92,12 @@ message BuildHashWorkOrder {
   }
 }
 
+// Next tag: 54.
 message BuildLIPFilterWorkOrder {
   extend WorkOrder {
     // All required.
     optional int32 relation_id = 48;
+    optional uint64 partition_id = 53;
     optional fixed64 build_block_id = 49;
     optional int32 build_side_predicate_index = 50;
     optional int32 lip_deployment_index = 51;