You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/14 23:58:46 UTC
incubator-quickstep git commit: Added operator and execution support
for partitioned nested loops join.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master cbb84b4d6 -> 361a65fa6
Added operator and execution support for partitioned nested loops join.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/361a65fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/361a65fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/361a65fa
Branch: refs/heads/master
Commit: 361a65fa615fcdcfd3f35869b2690d19e8156e86
Parents: cbb84b4
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Jun 3 18:59:14 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 18:57:15 2017 -0500
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 25 +-
query_execution/PolicyEnforcerDistributed.cpp | 20 ++
query_execution/PolicyEnforcerDistributed.hpp | 27 ++
query_execution/QueryContext.proto | 2 +
query_execution/QueryManagerDistributed.cpp | 5 +
query_execution/QueryManagerDistributed.hpp | 41 +++
query_optimizer/ExecutionGenerator.cpp | 29 +-
relational_operators/CMakeLists.txt | 2 +
.../NestedLoopsJoinOperator.cpp | 340 ++++++++++---------
.../NestedLoopsJoinOperator.hpp | 83 +++--
relational_operators/WorkOrder.proto | 3 +
11 files changed, 378 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index fbac18e..34b5b76 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -347,22 +347,6 @@ namespace {
constexpr size_t kDefaultShiftbossIndex = 0u;
-bool isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
- const BlockLocator &block_locator,
- std::size_t *shiftboss_index_for_join) {
- if (work_order_proto.work_order_type() != S::NESTED_LOOP_JOIN) {
- return false;
- }
-
- const block_id left_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id);
- if (block_locator.getBlockLocalityInfo(left_block, shiftboss_index_for_join)) {
- return true;
- }
-
- const block_id right_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id);
- return block_locator.getBlockLocalityInfo(right_block, shiftboss_index_for_join);
-}
-
bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
const BlockLocator &block_locator,
std::size_t *shiftboss_index_for_block) {
@@ -401,10 +385,15 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
} else if (isLipRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+ } else if (work_order_proto.work_order_type() == S::NESTED_LOOP_JOIN) {
+ static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForNestedLoopsJoin(
+ proto.query_id(), work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::nested_loops_join_index),
+ work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::partition_id), block_locator_,
+ work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id),
+ work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id),
+ shiftboss_index, &shiftboss_index_for_particular_work_order_type);
} else if (hasBlockLocalityInfo(work_order_proto, block_locator_,
&shiftboss_index_for_particular_work_order_type)) {
- } else if (isNestedLoopsJoinWorkOrder(work_order_proto, block_locator_,
- &shiftboss_index_for_particular_work_order_type)) {
} else {
shiftboss_index_for_particular_work_order_type = shiftboss_index;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 0a4fd30..7e9a81d 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -243,6 +243,26 @@ void PolicyEnforcerDistributed::getShiftbossIndexForLip(
shiftboss_index);
}
+void PolicyEnforcerDistributed::getShiftbossIndexForNestedLoopsJoin(
+ const std::size_t query_id,
+ const std::size_t nested_loops_join_index,
+ const partition_id part_id,
+ const BlockLocator &block_locator,
+ const block_id left_block,
+ const block_id right_block,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index_for_nested_loops_join) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+ query_manager->getShiftbossIndexForNestedLoopsJoin(nested_loops_join_index,
+ part_id,
+ block_locator,
+ left_block,
+ right_block,
+ next_shiftboss_index_to_schedule,
+ shiftboss_index_for_nested_loops_join);
+}
+
void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
S::QueryInitiateMessage proto;
proto.set_query_id(query_handle->query_id());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index f44fd2e..e24f8cf 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -198,6 +198,33 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index);
+ /**
+ * @brief Get or set the index of Shiftboss for a NestedLoopsJoin related WorkOrder.
+ * If it is the first join on <nested_loops_join_index, part_id>,
+ * <shiftboss_index> will be set to block locality if found,
+ * otherwise <next_shiftboss_index_to_schedule>.
+ * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
+ * has executed the first join.
+ *
+ * @param query_id The query id.
+ * @param nested_loops_join_index The Hash Table for the Join.
+ * @param part_id The partition ID.
+ * @param block_locator The BlockLocator to use.
+ * @param left_block The block id of the left side to feed BlockLocator for the locality info.
+ * @param right_block The block id of the right side to feed BlockLocator for the locality info.
+ * @param next_shiftboss_index_to_schedule The index of Shiftboss to schedule a next WorkOrder.
+ * @param shiftboss_index_for_nested_loops_join The index of Shiftboss to schedule the WorkOrder.
+ **/
+ void getShiftbossIndexForNestedLoopsJoin(
+ const std::size_t query_id,
+ const std::size_t nested_loops_join_index,
+ const partition_id part_id,
+ const BlockLocator &block_locator,
+ const block_id left_block,
+ const block_id right_block,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index_for_nested_loops_join);
+
private:
void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override {
shiftboss_directory_->decrementNumQueuedWorkOrders(proto.shiftboss_index());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index 6dce6b8..599daa7 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -67,4 +67,6 @@ message QueryContext {
repeated WindowAggregationOperationState window_aggregation_states = 12;
required uint64 query_id = 13;
+
+ repeated uint64 num_partitions_for_nested_loops_joins = 14;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index c9780fa..6c293a5 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -83,6 +83,11 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
vector<size_t>(query_context_proto.join_hash_tables(i).num_partitions(), kInvalidShiftbossIndex));
}
+ for (int i = 0; i < query_context_proto.num_partitions_for_nested_loops_joins_size(); ++i) {
+ shiftboss_indexes_for_nested_loops_joins_.push_back(
+ vector<size_t>(query_context_proto.num_partitions_for_nested_loops_joins(i), kInvalidShiftbossIndex));
+ }
+
computeLipFilterEquivalenceClasses(query_context_proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 0b9b848..6d454cc 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -195,6 +195,43 @@ class QueryManagerDistributed final : public QueryManagerBase {
}
}
+ /**
+ * @brief Get or set the index of Shiftboss for a NestedLoopsJoin related WorkOrder.
+ * If it is the first join on <nested_loops_join_index, part_id>,
+ * <shiftboss_index> will be set to block locality if found,
+ * otherwise <next_shiftboss_index_to_schedule>.
+ * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
+ * has executed the first join.
+ *
+ * @param nested_loops_join_index The Hash Table for the Join.
+ * @param part_id The partition ID.
+ * @param block_locator The BlockLocator to use.
+ * @param left_block The block id of the left side to feed BlockLocator for the locality info.
+ * @param right_block The block id of the right side to feed BlockLocator for the locality info.
+ * @param next_shiftboss_index_to_schedule The index of Shiftboss to schedule a next WorkOrder.
+ * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+ **/
+ void getShiftbossIndexForNestedLoopsJoin(const std::size_t nested_loops_join_index,
+ const partition_id part_id,
+ const BlockLocator &block_locator,
+ const block_id left_block,
+ const block_id right_block,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index) {
+ DCHECK_LT(nested_loops_join_index, shiftboss_indexes_for_nested_loops_joins_.size());
+ DCHECK_LT(part_id, shiftboss_indexes_for_nested_loops_joins_[nested_loops_join_index].size());
+
+ std::size_t *shiftboss_index_for_nested_loops_join =
+ &shiftboss_indexes_for_nested_loops_joins_[nested_loops_join_index][part_id];
+ if (*shiftboss_index_for_nested_loops_join == kInvalidShiftbossIndex &&
+ !block_locator.getBlockLocalityInfo(left_block, shiftboss_index_for_nested_loops_join) &&
+ !block_locator.getBlockLocalityInfo(right_block, shiftboss_index_for_nested_loops_join)) {
+ *shiftboss_index_for_nested_loops_join = next_shiftboss_index_to_schedule;
+ }
+
+ *shiftboss_index = *shiftboss_index_for_nested_loops_join;
+ }
+
private:
bool checkNormalExecutionOver(const dag_node_index index) const override {
return (checkAllDependenciesMet(index) &&
@@ -230,6 +267,10 @@ class QueryManagerDistributed final : public QueryManagerBase {
// [QueryContext::join_hash_table_id][partition_id].
std::vector<std::vector<std::size_t>> shiftboss_indexes_for_hash_joins_;
+ // Get the scheduled Shiftboss index given
+ // [nested_loops_join_index][partition_id].
+ std::vector<std::vector<std::size_t>> shiftboss_indexes_for_nested_loops_joins_;
+
typedef std::int64_t LipFilterGroupIndex;
// From an LIP id (QueryContext::lip_filter_id) to its index of the group that
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index d3870df..2dbcf16 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -954,14 +954,35 @@ void ExecutionGenerator::convertNestedLoopsJoin(
const CatalogRelationInfo *left_relation_info =
findRelationInfoOutputByPhysical(physical_plan->left());
+ const CatalogRelation &left_relation = *left_relation_info->relation;
const CatalogRelationInfo *right_relation_info =
findRelationInfoOutputByPhysical(physical_plan->right());
+ const CatalogRelation &right_relation = *right_relation_info->relation;
// FIXME(quickstep-team): Add support for self-join.
- if (left_relation_info->relation == right_relation_info->relation) {
+ if (left_relation.getID() == right_relation.getID()) {
THROW_SQL_ERROR() << "NestedLoopsJoin does not support self-join yet";
}
+ const PartitionScheme *left_partition_scheme = left_relation.getPartitionScheme();
+ const PartitionScheme *right_partition_scheme = right_relation.getPartitionScheme();
+ if (left_partition_scheme && right_partition_scheme) {
+ DCHECK_EQ(left_partition_scheme->getPartitionSchemeHeader().getNumPartitions(),
+ right_partition_scheme->getPartitionSchemeHeader().getNumPartitions());
+ } else if (left_partition_scheme) {
+ LOG(FATAL) << "Left side has partitions, but right does not";
+ } else if (right_partition_scheme) {
+ LOG(FATAL) << "Right side has partitions, but left does not";
+ }
+
+ const std::size_t num_partitions =
+ left_partition_scheme ? left_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
+ : 1u;
+
+ const std::size_t nested_loops_join_index =
+ query_context_proto_->num_partitions_for_nested_loops_joins_size();
+ query_context_proto_->add_num_partitions_for_nested_loops_joins(num_partitions);
+
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
@@ -975,8 +996,10 @@ void ExecutionGenerator::convertNestedLoopsJoin(
const QueryPlan::DAGNodeIndex join_operator_index =
execution_plan_->addRelationalOperator(
new NestedLoopsJoinOperator(query_handle_->query_id(),
- *left_relation_info->relation,
- *right_relation_info->relation,
+ nested_loops_join_index,
+ left_relation,
+ right_relation,
+ num_partitions,
*output_relation,
insert_destination_index,
execution_join_predicate_index,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 79e0dc7..5b3f009 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -319,6 +319,8 @@ target_link_libraries(quickstep_relationaloperators_NestedLoopsJoinOperator
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionScheme
+ quickstep_catalog_PartitionSchemeHeader
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_queryexecution_QueryContext
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/relational_operators/NestedLoopsJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.cpp b/relational_operators/NestedLoopsJoinOperator.cpp
index 4ef2a70..1c0bbec 100644
--- a/relational_operators/NestedLoopsJoinOperator.cpp
+++ b/relational_operators/NestedLoopsJoinOperator.cpp
@@ -57,9 +57,13 @@ bool NestedLoopsJoinOperator::getAllWorkOrders(
tmb::MessageBus *bus) {
if (left_relation_is_stored_ && right_relation_is_stored_) {
// Make sure we generate workorders only once.
- if (!all_workorders_generated_) {
- for (const block_id left_block_id : left_relation_block_ids_) {
- for (const block_id right_block_id : right_relation_block_ids_) {
+ if (all_workorders_generated_) {
+ return true;
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id left_block_id : left_relation_block_ids_[part_id]) {
+ for (const block_id right_block_id : right_relation_block_ids_[part_id]) {
container->addNormalWorkOrder(
new NestedLoopsJoinWorkOrder(
query_id_,
@@ -75,58 +79,64 @@ bool NestedLoopsJoinOperator::getAllWorkOrders(
op_index_);
}
}
- all_workorders_generated_ = true;
}
- return all_workorders_generated_;
+ all_workorders_generated_ = true;
+ return true;
} else if (!(left_relation_is_stored_ || right_relation_is_stored_)) {
// Both relations are not stored.
- std::vector<block_id>::size_type new_left_blocks
- = left_relation_block_ids_.size() - num_left_workorders_generated_;
- std::vector<block_id>::size_type new_right_blocks
- = right_relation_block_ids_.size() - num_right_workorders_generated_;
-
- std::size_t new_workorders = 0;
- if (new_left_blocks > 0 && new_right_blocks > 0) {
- // Blocks added to both left and right relations.
- // First generate (left + new_left_blocks) * (new_right_blocks).
- new_workorders = getAllWorkOrdersHelperBothNotStored(container,
- query_context,
- storage_manager,
- 0,
- left_relation_block_ids_.size(),
- num_right_workorders_generated_,
- right_relation_block_ids_.size());
-
- // Now generate new_left_blocks * (right).
- new_workorders += getAllWorkOrdersHelperBothNotStored(container,
- query_context,
- storage_manager,
- num_left_workorders_generated_,
- left_relation_block_ids_.size(),
- 0,
- num_right_workorders_generated_);
- } else if (new_left_blocks == 0 && new_right_blocks > 0) {
- // Only new right blocks are added. Generate left * new_right_blocks.
- new_workorders = getAllWorkOrdersHelperBothNotStored(container,
- query_context,
- storage_manager,
- 0,
- left_relation_block_ids_.size(),
- num_right_workorders_generated_,
- right_relation_block_ids_.size());
- } else if (new_left_blocks > 0 && new_right_blocks == 0) {
- // Generate new_left_blocks * right
- new_workorders = getAllWorkOrdersHelperBothNotStored(container,
- query_context,
- storage_manager,
- num_left_workorders_generated_,
- left_relation_block_ids_.size(),
- 0,
- right_relation_block_ids_.size());
- }
- if (new_workorders > 0) {
- num_left_workorders_generated_ = left_relation_block_ids_.size();
- num_right_workorders_generated_ = right_relation_block_ids_.size();
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ std::vector<block_id>::size_type new_left_blocks
+ = left_relation_block_ids_[part_id].size() - num_left_workorders_generated_[part_id];
+ std::vector<block_id>::size_type new_right_blocks
+ = right_relation_block_ids_[part_id].size() - num_right_workorders_generated_[part_id];
+
+ std::size_t new_workorders = 0;
+ if (new_left_blocks > 0 && new_right_blocks > 0) {
+ // Blocks added to both left and right relations.
+ // First generate (left + new_left_blocks) * (new_right_blocks).
+ new_workorders = getAllWorkOrdersHelperBothNotStored(container,
+ query_context,
+ storage_manager,
+ part_id,
+ 0,
+ left_relation_block_ids_[part_id].size(),
+ num_right_workorders_generated_[part_id],
+ right_relation_block_ids_[part_id].size());
+
+ // Now generate new_left_blocks * (right).
+ new_workorders += getAllWorkOrdersHelperBothNotStored(container,
+ query_context,
+ storage_manager,
+ part_id,
+ num_left_workorders_generated_[part_id],
+ left_relation_block_ids_[part_id].size(),
+ 0,
+ num_right_workorders_generated_[part_id]);
+ } else if (new_left_blocks == 0 && new_right_blocks > 0) {
+ // Only new right blocks are added. Generate left * new_right_blocks.
+ new_workorders = getAllWorkOrdersHelperBothNotStored(container,
+ query_context,
+ storage_manager,
+ part_id,
+ 0,
+ left_relation_block_ids_[part_id].size(),
+ num_right_workorders_generated_[part_id],
+ right_relation_block_ids_[part_id].size());
+ } else if (new_left_blocks > 0 && new_right_blocks == 0) {
+ // Generate new_left_blocks * right
+ new_workorders = getAllWorkOrdersHelperBothNotStored(container,
+ query_context,
+ storage_manager,
+ part_id,
+ num_left_workorders_generated_[part_id],
+ left_relation_block_ids_[part_id].size(),
+ 0,
+ right_relation_block_ids_[part_id].size());
+ }
+ if (new_workorders > 0) {
+ num_left_workorders_generated_[part_id] = left_relation_block_ids_[part_id].size();
+ num_right_workorders_generated_[part_id] = right_relation_block_ids_[part_id].size();
+ }
}
return done_feeding_left_relation_ && done_feeding_right_relation_;
} else {
@@ -138,61 +148,71 @@ bool NestedLoopsJoinOperator::getAllWorkOrders(
bool NestedLoopsJoinOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (left_relation_is_stored_ && right_relation_is_stored_) {
// Make sure we generate workorders only once.
- if (!all_workorders_generated_) {
- for (const block_id left_block_id : left_relation_block_ids_) {
- for (const block_id right_block_id : right_relation_block_ids_) {
- container->addWorkOrderProto(createWorkOrderProto(left_block_id, right_block_id),
+ if (all_workorders_generated_) {
+ return true;
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id left_block_id : left_relation_block_ids_[part_id]) {
+ for (const block_id right_block_id : right_relation_block_ids_[part_id]) {
+ container->addWorkOrderProto(createWorkOrderProto(part_id, left_block_id, right_block_id),
op_index_);
}
}
- all_workorders_generated_ = true;
}
+ all_workorders_generated_ = true;
return true;
} else if (!(left_relation_is_stored_ || right_relation_is_stored_)) {
// Both relations are not stored.
- const std::vector<block_id>::size_type new_left_blocks
- = left_relation_block_ids_.size() - num_left_workorders_generated_;
- const std::vector<block_id>::size_type new_right_blocks
- = right_relation_block_ids_.size() - num_right_workorders_generated_;
-
- std::size_t new_workorders = 0;
- if (new_left_blocks > 0 && new_right_blocks > 0) {
- // Blocks added to both left and right relations.
- // First generate (left + new_left_blocks) * (new_right_blocks).
- new_workorders =
- getAllWorkOrderProtosHelperBothNotStored(container,
- 0,
- left_relation_block_ids_.size(),
- num_right_workorders_generated_,
- right_relation_block_ids_.size());
-
- // Now generate new_left_blocks * (right).
- new_workorders +=
- getAllWorkOrderProtosHelperBothNotStored(container,
- num_left_workorders_generated_,
- left_relation_block_ids_.size(),
- 0,
- num_right_workorders_generated_);
- } else if (new_left_blocks == 0 && new_right_blocks > 0) {
- // Only new right blocks are added. Generate left * new_right_blocks.
- new_workorders =
- getAllWorkOrderProtosHelperBothNotStored(container,
- 0,
- left_relation_block_ids_.size(),
- num_right_workorders_generated_,
- right_relation_block_ids_.size());
- } else if (new_left_blocks > 0 && new_right_blocks == 0) {
- // Generate new_left_blocks * right
- new_workorders =
- getAllWorkOrderProtosHelperBothNotStored(container,
- num_left_workorders_generated_,
- left_relation_block_ids_.size(),
- 0,
- right_relation_block_ids_.size());
- }
- if (new_workorders > 0) {
- num_left_workorders_generated_ = left_relation_block_ids_.size();
- num_right_workorders_generated_ = right_relation_block_ids_.size();
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ const std::vector<block_id>::size_type new_left_blocks
+ = left_relation_block_ids_[part_id].size() - num_left_workorders_generated_[part_id];
+ const std::vector<block_id>::size_type new_right_blocks
+ = right_relation_block_ids_[part_id].size() - num_right_workorders_generated_[part_id];
+
+ std::size_t new_workorders = 0;
+ if (new_left_blocks > 0 && new_right_blocks > 0) {
+ // Blocks added to both left and right relations.
+ // First generate (left + new_left_blocks) * (new_right_blocks).
+ new_workorders =
+ getAllWorkOrderProtosHelperBothNotStored(container,
+ part_id,
+ 0,
+ left_relation_block_ids_[part_id].size(),
+ num_right_workorders_generated_[part_id],
+ right_relation_block_ids_[part_id].size());
+
+ // Now generate new_left_blocks * (right).
+ new_workorders +=
+ getAllWorkOrderProtosHelperBothNotStored(container,
+ part_id,
+ num_left_workorders_generated_[part_id],
+ left_relation_block_ids_[part_id].size(),
+ 0,
+ num_right_workorders_generated_[part_id]);
+ } else if (new_left_blocks == 0 && new_right_blocks > 0) {
+ // Only new right blocks are added. Generate left * new_right_blocks.
+ new_workorders =
+ getAllWorkOrderProtosHelperBothNotStored(container,
+ part_id,
+ 0,
+ left_relation_block_ids_[part_id].size(),
+ num_right_workorders_generated_[part_id],
+ right_relation_block_ids_[part_id].size());
+ } else if (new_left_blocks > 0 && new_right_blocks == 0) {
+ // Generate new_left_blocks * right
+ new_workorders =
+ getAllWorkOrderProtosHelperBothNotStored(container,
+ part_id,
+ num_left_workorders_generated_[part_id],
+ left_relation_block_ids_[part_id].size(),
+ 0,
+ right_relation_block_ids_[part_id].size());
+ }
+ if (new_workorders > 0) {
+ num_left_workorders_generated_[part_id] = left_relation_block_ids_[part_id].size();
+ num_right_workorders_generated_[part_id] = right_relation_block_ids_[part_id].size();
+ }
}
return done_feeding_left_relation_ && done_feeding_right_relation_;
} else {
@@ -204,6 +224,7 @@ bool NestedLoopsJoinOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *co
std::size_t NestedLoopsJoinOperator::getAllWorkOrdersHelperBothNotStored(WorkOrdersContainer *container,
QueryContext *query_context,
StorageManager *storage_manager,
+ const partition_id part_id,
std::vector<block_id>::size_type left_min,
std::vector<block_id>::size_type left_max,
std::vector<block_id>::size_type right_min,
@@ -223,8 +244,8 @@ std::size_t NestedLoopsJoinOperator::getAllWorkOrdersHelperBothNotStored(WorkOrd
query_id_,
left_input_relation_,
right_input_relation_,
- left_relation_block_ids_[left_index],
- right_relation_block_ids_[right_index],
+ left_relation_block_ids_[part_id][left_index],
+ right_relation_block_ids_[part_id][right_index],
query_context->getPredicate(join_predicate_index_),
query_context->getScalarGroup(selection_index_),
query_context->getInsertDestination(output_destination_index_),
@@ -249,51 +270,56 @@ bool NestedLoopsJoinOperator::getAllWorkOrdersHelperOneStored(WorkOrdersContaine
query_context->getInsertDestination(output_destination_index_);
if (left_relation_is_stored_) {
- for (std::vector<block_id>::size_type right_index = num_right_workorders_generated_;
- right_index < right_relation_block_ids_.size();
- ++right_index) {
- for (const block_id left_block_id : left_relation_block_ids_) {
- container->addNormalWorkOrder(
- new NestedLoopsJoinWorkOrder(
- query_id_,
- left_input_relation_,
- right_input_relation_,
- left_block_id,
- right_relation_block_ids_[right_index],
- join_predicate,
- selection,
- output_destination,
- storage_manager),
- op_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (std::vector<block_id>::size_type right_index = num_right_workorders_generated_[part_id];
+ right_index < right_relation_block_ids_[part_id].size();
+ ++right_index) {
+ for (const block_id left_block_id : left_relation_block_ids_[part_id]) {
+ container->addNormalWorkOrder(
+ new NestedLoopsJoinWorkOrder(
+ query_id_,
+ left_input_relation_,
+ right_input_relation_,
+ left_block_id,
+ right_relation_block_ids_[part_id][right_index],
+ join_predicate,
+ selection,
+ output_destination,
+ storage_manager),
+ op_index_);
+ }
}
+ num_right_workorders_generated_[part_id] = right_relation_block_ids_[part_id].size();
}
- num_right_workorders_generated_ = right_relation_block_ids_.size();
return done_feeding_right_relation_;
} else {
- for (std::vector<block_id>::size_type left_index = num_left_workorders_generated_;
- left_index < left_relation_block_ids_.size();
- ++left_index) {
- for (const block_id right_block_id : right_relation_block_ids_) {
- container->addNormalWorkOrder(
- new NestedLoopsJoinWorkOrder(query_id_,
- left_input_relation_,
- right_input_relation_,
- left_relation_block_ids_[left_index],
- right_block_id,
- join_predicate,
- selection,
- output_destination,
- storage_manager),
- op_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (std::vector<block_id>::size_type left_index = num_left_workorders_generated_[part_id];
+ left_index < left_relation_block_ids_[part_id].size();
+ ++left_index) {
+ for (const block_id right_block_id : right_relation_block_ids_[part_id]) {
+ container->addNormalWorkOrder(
+ new NestedLoopsJoinWorkOrder(query_id_,
+ left_input_relation_,
+ right_input_relation_,
+ left_relation_block_ids_[part_id][left_index],
+ right_block_id,
+ join_predicate,
+ selection,
+ output_destination,
+ storage_manager),
+ op_index_);
+ }
}
+ num_left_workorders_generated_[part_id] = left_relation_block_ids_[part_id].size();
}
- num_left_workorders_generated_ = left_relation_block_ids_.size();
return done_feeding_left_relation_;
}
}
std::size_t NestedLoopsJoinOperator::getAllWorkOrderProtosHelperBothNotStored(
WorkOrderProtosContainer *container,
+ const partition_id part_id,
const std::vector<block_id>::size_type left_min,
const std::vector<block_id>::size_type left_max,
const std::vector<block_id>::size_type right_min,
@@ -309,7 +335,8 @@ std::size_t NestedLoopsJoinOperator::getAllWorkOrderProtosHelperBothNotStored(
right_index < right_max;
++right_index) {
container->addWorkOrderProto(
- createWorkOrderProto(left_relation_block_ids_[left_index], right_relation_block_ids_[right_index]),
+ createWorkOrderProto(part_id, left_relation_block_ids_[part_id][left_index],
+ right_relation_block_ids_[part_id][right_index]),
op_index_);
}
}
@@ -321,40 +348,47 @@ bool NestedLoopsJoinOperator::getAllWorkOrderProtosHelperOneStored(WorkOrderProt
DCHECK(left_relation_is_stored_ ^ right_relation_is_stored_);
if (left_relation_is_stored_) {
- for (std::vector<block_id>::size_type right_index = num_right_workorders_generated_;
- right_index < right_relation_block_ids_.size();
- ++right_index) {
- for (const block_id left_block_id : left_relation_block_ids_) {
- container->addWorkOrderProto(
- createWorkOrderProto(left_block_id, right_relation_block_ids_[right_index]),
- op_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (std::vector<block_id>::size_type right_index = num_right_workorders_generated_[part_id];
+ right_index < right_relation_block_ids_[part_id].size();
+ ++right_index) {
+ for (const block_id left_block_id : left_relation_block_ids_[part_id]) {
+ container->addWorkOrderProto(
+ createWorkOrderProto(part_id, left_block_id, right_relation_block_ids_[part_id][right_index]),
+ op_index_);
+ }
}
+ num_right_workorders_generated_[part_id] = right_relation_block_ids_[part_id].size();
}
- num_right_workorders_generated_ = right_relation_block_ids_.size();
return done_feeding_right_relation_;
} else {
- for (std::vector<block_id>::size_type left_index = num_left_workorders_generated_;
- left_index < left_relation_block_ids_.size();
- ++left_index) {
- for (const block_id right_block_id : right_relation_block_ids_) {
- container->addWorkOrderProto(
- createWorkOrderProto(left_relation_block_ids_[left_index], right_block_id),
- op_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (std::vector<block_id>::size_type left_index = num_left_workorders_generated_[part_id];
+ left_index < left_relation_block_ids_[part_id].size();
+ ++left_index) {
+ for (const block_id right_block_id : right_relation_block_ids_[part_id]) {
+ container->addWorkOrderProto(
+ createWorkOrderProto(part_id, left_relation_block_ids_[part_id][left_index], right_block_id),
+ op_index_);
+ }
}
+ num_left_workorders_generated_[part_id] = left_relation_block_ids_[part_id].size();
}
- num_left_workorders_generated_ = left_relation_block_ids_.size();
return done_feeding_left_relation_;
}
}
-serialization::WorkOrder* NestedLoopsJoinOperator::createWorkOrderProto(const block_id left_block,
+serialization::WorkOrder* NestedLoopsJoinOperator::createWorkOrderProto(const partition_id part_id,
+ const block_id left_block,
const block_id right_block) {
serialization::WorkOrder *proto = new serialization::WorkOrder;
proto->set_work_order_type(serialization::NESTED_LOOP_JOIN);
proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::nested_loops_join_index, nested_loops_join_index_);
proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id, left_input_relation_.getID());
proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::right_relation_id, right_input_relation_.getID());
+ proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::partition_id, part_id);
proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::left_block_id, left_block);
proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::right_block_id, right_block);
proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/relational_operators/NestedLoopsJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp
index 3012114..7938560 100644
--- a/relational_operators/NestedLoopsJoinOperator.hpp
+++ b/relational_operators/NestedLoopsJoinOperator.hpp
@@ -27,6 +27,8 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -66,10 +68,12 @@ class NestedLoopsJoinOperator : public RelationalOperator {
* @brief Constructor.
*
* @param query_id The ID of the query to which this operator belongs.
+ * @param nested_loops_join_index The ID of this operator.
* @param left_input_relation The first relation in the join (order is not
* actually important).
* @param right_input_relation The second relation in the join (order is not
* actually important).
+ * @param num_partitions The number of partitions.
* @param output_relation The output relation.
* @param output_destination_index The index of the InsertDestination in the
* QueryContext to insert the join results.
@@ -86,35 +90,61 @@ class NestedLoopsJoinOperator : public RelationalOperator {
**/
NestedLoopsJoinOperator(
const std::size_t query_id,
+ const std::size_t nested_loops_join_index,
const CatalogRelation &left_input_relation,
const CatalogRelation &right_input_relation,
+ const std::size_t num_partitions,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index,
const QueryContext::predicate_id join_predicate_index,
const QueryContext::scalar_group_id selection_index,
- bool left_relation_is_stored,
- bool right_relation_is_stored)
+ const bool left_relation_is_stored,
+ const bool right_relation_is_stored)
: RelationalOperator(query_id),
+ nested_loops_join_index_(nested_loops_join_index),
left_input_relation_(left_input_relation),
right_input_relation_(right_input_relation),
+ num_partitions_(num_partitions),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
join_predicate_index_(join_predicate_index),
selection_index_(selection_index),
left_relation_is_stored_(left_relation_is_stored),
right_relation_is_stored_(right_relation_is_stored),
- left_relation_block_ids_(left_relation_is_stored
- ? left_input_relation.getBlocksSnapshot()
- : std::vector<block_id>()),
- right_relation_block_ids_(right_relation_is_stored
- ? right_input_relation.getBlocksSnapshot()
- : std::vector<block_id>()),
- num_left_workorders_generated_(0),
- num_right_workorders_generated_(0),
+ left_relation_block_ids_(num_partitions),
+ right_relation_block_ids_(num_partitions),
+ num_left_workorders_generated_(num_partitions),
+ num_right_workorders_generated_(num_partitions),
done_feeding_left_relation_(false),
done_feeding_right_relation_(false),
all_workorders_generated_(false) {
DCHECK_NE(join_predicate_index_, QueryContext::kInvalidPredicateId);
+
+ if (left_relation_is_stored) {
+ if (left_input_relation_.hasPartitionScheme()) {
+ const PartitionScheme &part_scheme = *left_input_relation_.getPartitionScheme();
+ DCHECK_EQ(num_partitions_, part_scheme.getPartitionSchemeHeader().getNumPartitions());
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+ left_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+ }
+ } else {
+ DCHECK_EQ(1u, num_partitions_);
+ left_relation_block_ids_[0] = left_input_relation_.getBlocksSnapshot();
+ }
+ }
+
+ if (right_relation_is_stored) {
+ if (right_input_relation_.hasPartitionScheme()) {
+ const PartitionScheme &part_scheme = *right_input_relation_.getPartitionScheme();
+ DCHECK_EQ(num_partitions_, part_scheme.getPartitionSchemeHeader().getNumPartitions());
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+ right_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+ }
+ } else {
+ DCHECK_EQ(1u, num_partitions_);
+ right_relation_block_ids_[0] = right_input_relation_.getBlocksSnapshot();
+ }
+ }
}
~NestedLoopsJoinOperator() override {}
@@ -148,9 +178,9 @@ class NestedLoopsJoinOperator : public RelationalOperator {
void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
const partition_id part_id) override {
if (input_relation_id == left_input_relation_.getID()) {
- left_relation_block_ids_.push_back(input_block_id);
+ left_relation_block_ids_[part_id].push_back(input_block_id);
} else if (input_relation_id == right_input_relation_.getID()) {
- right_relation_block_ids_.push_back(input_block_id);
+ right_relation_block_ids_[part_id].push_back(input_block_id);
} else {
LOG(FATAL) << "The input block sent to the NestedLoopsJoinOperator belongs "
<< "to a different relation than the left and right relations";
@@ -175,6 +205,7 @@ class NestedLoopsJoinOperator : public RelationalOperator {
* resulting WorkOrders.
* @param query_context The QueryContext that stores query execution states.
* @param storage_manager The StorageManager to use.
+ * @param part_id The partition ID.
* @param left_min The starting index in left_relation_block_ids_ from where
* we begin generating NestedLoopsJoinWorkOrders.
* @param left_max The index in left_relation_block_ids_ until which we
@@ -190,6 +221,7 @@ class NestedLoopsJoinOperator : public RelationalOperator {
std::size_t getAllWorkOrdersHelperBothNotStored(WorkOrdersContainer *container,
QueryContext *query_context,
StorageManager *storage_manager,
+ const partition_id part_id,
std::vector<block_id>::size_type left_min,
std::vector<block_id>::size_type left_max,
std::vector<block_id>::size_type right_min,
@@ -232,6 +264,7 @@ class NestedLoopsJoinOperator : public RelationalOperator {
* function.
**/
std::size_t getAllWorkOrderProtosHelperBothNotStored(WorkOrderProtosContainer *container,
+ const partition_id part_id,
const std::vector<block_id>::size_type left_min,
const std::vector<block_id>::size_type left_max,
const std::vector<block_id>::size_type right_min,
@@ -249,17 +282,17 @@ class NestedLoopsJoinOperator : public RelationalOperator {
**/
bool getAllWorkOrderProtosHelperOneStored(WorkOrderProtosContainer *container);
- /**
- * @brief Create Work Order proto.
- *
- * @param block The block id used in the Work Order.
- **/
- serialization::WorkOrder* createWorkOrderProto(const block_id left_block,
+ serialization::WorkOrder* createWorkOrderProto(const partition_id part_id,
+ const block_id left_block,
const block_id right_block);
+ const std::size_t nested_loops_join_index_;
+
const CatalogRelation &left_input_relation_;
const CatalogRelation &right_input_relation_;
+ const std::size_t num_partitions_;
+
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
@@ -269,14 +302,14 @@ class NestedLoopsJoinOperator : public RelationalOperator {
const bool left_relation_is_stored_;
const bool right_relation_is_stored_;
- std::vector<block_id> left_relation_block_ids_;
- std::vector<block_id> right_relation_block_ids_;
+ std::vector<BlocksInPartition> left_relation_block_ids_;
+ std::vector<BlocksInPartition> right_relation_block_ids_;
- // At a given point of time, we have paired num_left_workorders_generated
- // number of blocks from the left relation with num_right_workorders_generated
- // number of blocks from the right relation.
- std::vector<block_id>::size_type num_left_workorders_generated_;
- std::vector<block_id>::size_type num_right_workorders_generated_;
+ // At a given point of time, we have paired num_left_workorders_generated[part_id]
+ // number of blocks from the left relation with num_right_workorders_generated[part_id]
+ // number of blocks from the right relation for a given 'part_id'.
+ std::vector<std::size_t> num_left_workorders_generated_;
+ std::vector<std::size_t> num_right_workorders_generated_;
bool done_feeding_left_relation_;
bool done_feeding_right_relation_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/361a65fa/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index c6715c7..99b4507 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -186,11 +186,14 @@ message InsertWorkOrder {
}
}
+// Next tag: 201.
message NestedLoopsJoinWorkOrder {
extend WorkOrder {
// All required.
+ optional uint64 nested_loops_join_index = 200;
optional int32 left_relation_id = 192;
optional int32 right_relation_id = 193;
+ optional uint64 partition_id = 199;
optional fixed64 left_block_id = 194;
optional fixed64 right_block_id = 195;
optional int32 insert_destination_index = 196;