You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/14 23:51:19 UTC
incubator-quickstep git commit: Added operator support for
BuildLipFilter with partitioned inputs.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 0f549acb5 -> 7c5d76223
Added operator support for BuildLipFilter with partitioned inputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7c5d7622
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7c5d7622
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7c5d7622
Branch: refs/heads/master
Commit: 7c5d7622332c6684948daa2bc8de7f38488521f7
Parents: 0f549ac
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Jun 14 15:22:13 2017 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 16:10:20 2017 -0500
----------------------------------------------------------------------
query_optimizer/ExecutionGenerator.cpp | 11 +++-
relational_operators/BuildLIPFilterOperator.cpp | 64 ++++++++++++--------
relational_operators/BuildLIPFilterOperator.hpp | 37 ++++++++---
relational_operators/CMakeLists.txt | 2 +
relational_operators/WorkOrder.proto | 2 +
5 files changed, 82 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index acc2bf1..d3870df 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -663,6 +663,14 @@ void ExecutionGenerator::convertFilterJoin(const P::FilterJoinPtr &physical_plan
const CatalogRelationInfo *build_relation_info =
findRelationInfoOutputByPhysical(build_physical);
+ const CatalogRelation &build_relation = *build_relation_info->relation;
+ const PartitionScheme *build_partition_scheme = build_relation.getPartitionScheme();
+
+ const std::size_t build_num_partitions =
+ build_partition_scheme
+ ? build_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
+ : 1u;
+
// Create a BuildLIPFilterOperator for the FilterJoin. This operator builds
// LIP filters that are applied properly in downstream operators to achieve
// the filter-join semantics.
@@ -670,7 +678,8 @@ void ExecutionGenerator::convertFilterJoin(const P::FilterJoinPtr &physical_plan
execution_plan_->addRelationalOperator(
new BuildLIPFilterOperator(
query_handle_->query_id(),
- *build_relation_info->relation,
+ build_relation,
+ build_num_partitions,
build_side_predicate_index,
build_relation_info->isStoredRelation()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/relational_operators/BuildLIPFilterOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp
index e7e549c..925dfb5 100644
--- a/relational_operators/BuildLIPFilterOperator.cpp
+++ b/relational_operators/BuildLIPFilterOperator.cpp
@@ -55,8 +55,12 @@ bool BuildLIPFilterOperator::getAllWorkOrders(
query_context->getPredicate(build_side_predicate_index_);
if (input_relation_is_stored_) {
- if (!started_) {
- for (const block_id input_block_id : input_relation_block_ids_) {
+ if (started_) {
+ return true;
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
container->addNormalWorkOrder(
new BuildLIPFilterWorkOrder(
query_id_,
@@ -68,22 +72,24 @@ bool BuildLIPFilterOperator::getAllWorkOrders(
CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
op_index_);
}
- started_ = true;
}
+ started_ = true;
return true;
} else {
- while (num_workorders_generated_ < input_relation_block_ids_.size()) {
- container->addNormalWorkOrder(
- new BuildLIPFilterWorkOrder(
- query_id_,
- input_relation_,
- input_relation_block_ids_[num_workorders_generated_],
- build_side_predicate,
- storage_manager,
- CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
- CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
- op_index_);
- ++num_workorders_generated_;
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+ container->addNormalWorkOrder(
+ new BuildLIPFilterWorkOrder(
+ query_id_,
+ input_relation_,
+ input_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+ build_side_predicate,
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
+ CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ }
}
return done_feeding_input_relation_;
}
@@ -91,30 +97,38 @@ bool BuildLIPFilterOperator::getAllWorkOrders(
bool BuildLIPFilterOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (input_relation_is_stored_) {
- if (!started_) {
- for (const block_id block : input_relation_block_ids_) {
- container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
+ if (started_) {
+ return true;
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id block : input_relation_block_ids_[part_id]) {
+ container->addWorkOrderProto(createWorkOrderProto(part_id, block), op_index_);
}
- started_ = true;
}
+ started_ = true;
return true;
} else {
- while (num_workorders_generated_ < input_relation_block_ids_.size()) {
- container->addWorkOrderProto(
- createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
- op_index_);
- ++num_workorders_generated_;
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+ container->addWorkOrderProto(
+ createWorkOrderProto(part_id, input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ }
}
return done_feeding_input_relation_;
}
}
-serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const block_id block) {
+serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const partition_id part_id,
+ const block_id block) {
serialization::WorkOrder *proto = new serialization::WorkOrder;
proto->set_work_order_type(serialization::BUILD_LIP_FILTER);
proto->set_query_id(query_id_);
proto->SetExtension(serialization::BuildLIPFilterWorkOrder::relation_id, input_relation_.getID());
+ proto->SetExtension(serialization::BuildLIPFilterWorkOrder::partition_id, part_id);
proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id, block);
proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index,
build_side_predicate_index_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/relational_operators/BuildLIPFilterOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp
index 68bd307..df722ef 100644
--- a/relational_operators/BuildLIPFilterOperator.hpp
+++ b/relational_operators/BuildLIPFilterOperator.hpp
@@ -27,6 +27,8 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -68,6 +70,8 @@ class BuildLIPFilterOperator : public RelationalOperator {
*
* @param query_id The ID of the query to which this operator belongs.
* @param input_relation The relation to build LIP filters on.
+ * @param num_partitions The number of partitions in 'input_relation'.
+ * If no partitions, it is one.
* @param build_side_predicate_index The index of the predicate in QueryContext
* where the predicate is to be applied to the input relation before
* building the LIP filters (or kInvalidPredicateId if no predicate is
@@ -78,16 +82,30 @@ class BuildLIPFilterOperator : public RelationalOperator {
**/
BuildLIPFilterOperator(const std::size_t query_id,
const CatalogRelation &input_relation,
+ const std::size_t num_partitions,
const QueryContext::predicate_id build_side_predicate_index,
const bool input_relation_is_stored)
: RelationalOperator(query_id),
input_relation_(input_relation),
+ num_partitions_(num_partitions),
build_side_predicate_index_(build_side_predicate_index),
input_relation_is_stored_(input_relation_is_stored),
- input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
- : std::vector<block_id>()),
- num_workorders_generated_(0),
- started_(false) {}
+ input_relation_block_ids_(num_partitions),
+ num_workorders_generated_(num_partitions),
+ started_(false) {
+ if (input_relation_is_stored) {
+ if (input_relation.hasPartitionScheme()) {
+ const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
+ DCHECK_EQ(part_scheme.getPartitionSchemeHeader().getNumPartitions(), num_partitions_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+ }
+ } else {
+ // No partitions.
+ input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
+ }
+ }
+ }
~BuildLIPFilterOperator() override {}
@@ -117,23 +135,26 @@ class BuildLIPFilterOperator : public RelationalOperator {
void feedInputBlock(const block_id input_block_id,
const relation_id input_relation_id,
const partition_id part_id) override {
- input_relation_block_ids_.push_back(input_block_id);
+ input_relation_block_ids_[part_id].push_back(input_block_id);
}
private:
/**
* @brief Create Work Order proto.
*
+ * @param part_id The partition id of 'input_relation_'.
* @param block The block id used in the Work Order.
**/
- serialization::WorkOrder* createWorkOrderProto(const block_id block);
+ serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, const block_id block);
const CatalogRelation &input_relation_;
+ const std::size_t num_partitions_;
const QueryContext::predicate_id build_side_predicate_index_;
const bool input_relation_is_stored_;
- std::vector<block_id> input_relation_block_ids_;
- std::vector<block_id>::size_type num_workorders_generated_;
+ // The index is the partition id.
+ std::vector<BlocksInPartition> input_relation_block_ids_;
+ std::vector<std::size_t> num_workorders_generated_;
bool started_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index a6a3cd0..79e0dc7 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -150,6 +150,8 @@ target_link_libraries(quickstep_relationaloperators_BuildLIPFilterOperator
glog
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionScheme
+ quickstep_catalog_PartitionSchemeHeader
quickstep_queryexecution_QueryContext
quickstep_queryexecution_WorkOrderProtosContainer
quickstep_queryexecution_WorkOrdersContainer
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7c5d7622/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index bac2eb0..c6715c7 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -92,10 +92,12 @@ message BuildHashWorkOrder {
}
}
+// Next tag: 54.
message BuildLIPFilterWorkOrder {
extend WorkOrder {
// All required.
optional int32 relation_id = 48;
+ optional uint64 partition_id = 53;
optional fixed64 build_block_id = 49;
optional int32 build_side_predicate_index = 50;
optional int32 lip_deployment_index = 51;