You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/15 01:47:39 UTC
incubator-quickstep git commit: Using PartitionSchemeHeader in
Physical Plan node.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 13c16b9cb -> 5fbfd2111
Using PartitionSchemeHeader in Physical Plan node.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5fbfd211
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5fbfd211
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5fbfd211
Branch: refs/heads/master
Commit: 5fbfd21110091cfaf262daa8d9d2620a4e3f03bb
Parents: 13c16b9
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Jun 13 21:50:41 2017 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 20:42:37 2017 -0500
----------------------------------------------------------------------
query_optimizer/CMakeLists.txt | 3 +-
query_optimizer/ExecutionGenerator.cpp | 100 ++++++++++++++++-------
query_optimizer/physical/BinaryJoin.cpp | 7 ++
query_optimizer/physical/BinaryJoin.hpp | 8 +-
query_optimizer/physical/CMakeLists.txt | 8 ++
query_optimizer/physical/HashJoin.cpp | 4 +-
query_optimizer/physical/HashJoin.hpp | 24 ++++--
query_optimizer/physical/Join.hpp | 9 +-
query_optimizer/physical/Physical.hpp | 52 +++++++++++-
query_optimizer/physical/Selection.cpp | 40 ++++++++-
query_optimizer/physical/Selection.hpp | 27 ++++--
query_optimizer/physical/TableGenerator.hpp | 18 +++-
query_optimizer/physical/TableReference.cpp | 55 +++++++++++++
query_optimizer/physical/TableReference.hpp | 21 ++---
query_optimizer/rules/CMakeLists.txt | 4 +-
relational_operators/HashJoinOperator.cpp | 6 ++
relational_operators/HashJoinOperator.hpp | 4 +-
relational_operators/SelectOperator.cpp | 13 +--
relational_operators/SelectOperator.hpp | 10 ++-
relational_operators/WorkOrder.proto | 2 +
relational_operators/WorkOrderFactory.cpp | 9 +-
storage/CMakeLists.txt | 1 +
storage/InsertDestination.cpp | 34 ++------
storage/InsertDestination.hpp | 34 +++++++-
utility/CMakeLists.txt | 1 +
utility/PlanVisualizer.cpp | 6 ++
26 files changed, 394 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index c969f16..04af02c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -103,6 +103,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_queryoptimizer_physical_InsertTuple
quickstep_queryoptimizer_physical_LIPFilterConfiguration
quickstep_queryoptimizer_physical_NestedLoopsJoin
+ quickstep_queryoptimizer_physical_PartitionSchemeHeader
quickstep_queryoptimizer_physical_PatternMatcher
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_physical_PhysicalType
@@ -211,6 +212,7 @@ target_link_libraries(quickstep_queryoptimizer_OptimizerTree
target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
${GFLAGS_LIB_NAME}
quickstep_queryoptimizer_LogicalToPhysicalMapper
+ quickstep_queryoptimizer_Validator
quickstep_queryoptimizer_logical_Logical
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_rules_AttachLIPFilters
@@ -230,7 +232,6 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
quickstep_queryoptimizer_strategy_OneToOne
quickstep_queryoptimizer_strategy_Selection
quickstep_queryoptimizer_strategy_Strategy
- quickstep_queryoptimizer_Validator
quickstep_utility_Macros
quickstep_utility_PlanVisualizer)
target_link_libraries(quickstep_queryoptimizer_QueryHandle
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 3b2fe08..a7c7328 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -84,6 +84,7 @@
#include "query_optimizer/physical/InsertTuple.hpp"
#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
#include "query_optimizer/physical/PatternMatcher.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
@@ -140,6 +141,7 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+using std::make_unique;
using std::move;
using std::static_pointer_cast;
using std::unique_ptr;
@@ -363,16 +365,52 @@ void ExecutionGenerator::createTemporaryCatalogRelation(
++aid;
}
+ const P::PartitionSchemeHeader *partition_scheme_header = physical->getOutputPartitionSchemeHeader();
+ if (partition_scheme_header) {
+ PartitionSchemeHeader::PartitionAttributeIds output_partition_attr_ids;
+ for (const auto &partition_equivalent_expr_ids : partition_scheme_header->partition_expr_ids) {
+ DCHECK(!partition_equivalent_expr_ids.empty());
+ const E::ExprId partition_expr_id = *partition_equivalent_expr_ids.begin();
+ DCHECK(attribute_substitution_map_.find(partition_expr_id) != attribute_substitution_map_.end());
+ output_partition_attr_ids.push_back(attribute_substitution_map_[partition_expr_id]->getID());
+ }
+
+ const size_t num_partition = partition_scheme_header->num_partitions;
+ unique_ptr<PartitionSchemeHeader> output_partition_scheme_header;
+ switch (partition_scheme_header->partition_type) {
+ case P::PartitionSchemeHeader::PartitionType::kHash:
+ output_partition_scheme_header =
+ make_unique<HashPartitionSchemeHeader>(num_partition, move(output_partition_attr_ids));
+ break;
+ case P::PartitionSchemeHeader::PartitionType::kRandom:
+ output_partition_scheme_header =
+ make_unique<RandomPartitionSchemeHeader>(num_partition);
+ break;
+ case P::PartitionSchemeHeader::PartitionType::kRange:
+ LOG(FATAL) << "Unimplemented";
+ default:
+ LOG(FATAL) << "Unknown partition type";
+ }
+ auto output_partition_scheme = make_unique<PartitionScheme>(output_partition_scheme_header.release());
+
+ insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+ insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+ ->MergeFrom(output_partition_scheme->getProto());
+
+ catalog_relation->setPartitionScheme(output_partition_scheme.release());
+ } else {
+ insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
+ }
+
*catalog_relation_output = catalog_relation.get();
const relation_id output_rel_id = catalog_database_->addRelation(
catalog_relation.release());
+ insert_destination_proto->set_relation_id(output_rel_id);
+
#ifdef QUICKSTEP_DISTRIBUTED
referenced_relation_ids_.insert(output_rel_id);
#endif
-
- insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
- insert_destination_proto->set_relation_id(output_rel_id);
}
void ExecutionGenerator::dropAllTemporaryRelations() {
@@ -499,13 +537,28 @@ bool ExecutionGenerator::convertSimpleProjection(
void ExecutionGenerator::convertSelection(
const P::SelectionPtr &physical_selection) {
+ const P::PhysicalPtr input = physical_selection->input();
+ const CatalogRelationInfo *input_relation_info =
+ findRelationInfoOutputByPhysical(input);
+ DCHECK(input_relation_info != nullptr);
+ const CatalogRelation &input_relation = *input_relation_info->relation;
+
// Check if the Selection is only for renaming columns.
if (physical_selection->filter_predicate() == nullptr) {
- const std::vector<E::AttributeReferencePtr> input_attributes =
- physical_selection->input()->getOutputAttributes();
+ const P::PartitionSchemeHeader *physical_select_partition_scheme_header =
+ physical_selection->getOutputPartitionSchemeHeader();
+ const P::PartitionSchemeHeader *physical_input_partition_scheme_header = input->getOutputPartitionSchemeHeader();
+
+ const bool are_same_physical_partition_scheme_headers =
+ (!physical_select_partition_scheme_header && !physical_input_partition_scheme_header) ||
+ (physical_select_partition_scheme_header && physical_input_partition_scheme_header &&
+ physical_select_partition_scheme_header->equal(*physical_input_partition_scheme_header));
+
+ const std::vector<E::AttributeReferencePtr> input_attributes = input->getOutputAttributes();
+
const std::vector<E::NamedExpressionPtr> &project_expressions =
physical_selection->project_expressions();
- if (project_expressions.size() == input_attributes.size()) {
+ if (project_expressions.size() == input_attributes.size() && are_same_physical_partition_scheme_headers) {
bool has_different_attrs = false;
for (std::size_t attr_idx = 0; attr_idx < input_attributes.size(); ++attr_idx) {
if (project_expressions[attr_idx]->id() != input_attributes[attr_idx]->id()) {
@@ -514,12 +567,9 @@ void ExecutionGenerator::convertSelection(
}
}
if (!has_different_attrs) {
- const std::unordered_map<P::PhysicalPtr, CatalogRelationInfo>::const_iterator input_catalog_rel_it =
- physical_to_output_relation_map_.find(physical_selection->input());
- DCHECK(input_catalog_rel_it != physical_to_output_relation_map_.end());
- if (!input_catalog_rel_it->second.isStoredRelation()) {
+ if (!input_relation_info->isStoredRelation()) {
CatalogRelation *catalog_relation =
- const_cast<CatalogRelation*>(input_catalog_rel_it->second.relation);
+ const_cast<CatalogRelation*>(input_relation_info->relation);
for (std::size_t attr_idx = 0; attr_idx < project_expressions.size(); ++attr_idx) {
CatalogAttribute *catalog_attribute =
catalog_relation->getAttributeByIdMutable(attr_idx);
@@ -528,7 +578,7 @@ void ExecutionGenerator::convertSelection(
project_expressions[attr_idx]->attribute_alias());
}
physical_to_output_relation_map_.emplace(physical_selection,
- input_catalog_rel_it->second);
+ *input_relation_info);
return;
}
}
@@ -560,10 +610,6 @@ void ExecutionGenerator::convertSelection(
insert_destination_proto);
// Create and add a Select operator.
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_selection->input());
- DCHECK(input_relation_info != nullptr);
- const CatalogRelation &input_relation = *input_relation_info->relation;
const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme();
const std::size_t num_partitions =
@@ -799,17 +845,11 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
S::QueryContext::HashTableContext *hash_table_context_proto =
query_context_proto_->add_join_hash_tables();
- // No partition.
- std::size_t num_partitions = 1;
- if (build_relation->hasPartitionScheme() &&
- build_attribute_ids.size() == 1) {
- const PartitionSchemeHeader &partition_scheme_header =
- build_relation->getPartitionScheme()->getPartitionSchemeHeader();
- if (build_attribute_ids[0] == partition_scheme_header.getPartitionAttributeIds().front()) {
- // TODO(zuyu): add optimizer support for partitioned hash joins.
- hash_table_context_proto->set_num_partitions(num_partitions);
- }
- }
+ const P::PartitionSchemeHeader *probe_partition_scheme_header = probe_physical->getOutputPartitionSchemeHeader();
+ const std::size_t probe_num_partitions =
+ probe_partition_scheme_header ? probe_partition_scheme_header->num_partitions : 1u;
+ hash_table_context_proto->set_num_partitions(probe_num_partitions);
+
S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table();
@@ -837,7 +877,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
build_relation_info->isStoredRelation(),
build_attribute_ids,
any_build_attributes_nullable,
- num_partitions,
+ probe_num_partitions,
join_hash_table_index));
// Create InsertDestination proto.
@@ -880,7 +920,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
probe_operator_info->isStoredRelation(),
probe_attribute_ids,
any_probe_attributes_nullable,
- num_partitions,
+ probe_num_partitions,
*output_relation,
insert_destination_index,
join_hash_table_index,
@@ -892,7 +932,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
const QueryPlan::DAGNodeIndex destroy_operator_index =
execution_plan_->addRelationalOperator(new DestroyHashOperator(
- query_handle_->query_id(), num_partitions, join_hash_table_index));
+ query_handle_->query_id(), probe_num_partitions, join_hash_table_index));
if (!build_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(build_operator_index,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/BinaryJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/BinaryJoin.cpp b/query_optimizer/physical/BinaryJoin.cpp
index 30e2e8d..1928198 100644
--- a/query_optimizer/physical/BinaryJoin.cpp
+++ b/query_optimizer/physical/BinaryJoin.cpp
@@ -23,6 +23,8 @@
#include <vector>
#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
+#include "query_optimizer/physical/Physical.hpp"
#include "utility/Cast.hpp"
namespace quickstep {
@@ -38,6 +40,11 @@ void BinaryJoin::getFieldStringItems(
std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
std::vector<std::string> *container_child_field_names,
std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+ if (partition_scheme_header_) {
+ inline_field_names->push_back("output_partition_scheme_header");
+ inline_field_values->push_back(partition_scheme_header_->toString());
+ }
+
non_container_child_field_names->push_back("left");
non_container_child_field_names->push_back("right");
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/BinaryJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/BinaryJoin.hpp b/query_optimizer/physical/BinaryJoin.hpp
index 4b5671d..d1f652f 100644
--- a/query_optimizer/physical/BinaryJoin.hpp
+++ b/query_optimizer/physical/BinaryJoin.hpp
@@ -41,6 +41,8 @@ namespace physical {
class BinaryJoin;
typedef std::shared_ptr<const BinaryJoin> BinaryJoinPtr;
+struct PartitionSchemeHeader;
+
/**
* @brief Base class for binary join nodes.
*/
@@ -68,11 +70,13 @@ class BinaryJoin : public Join {
* @param left The left operand.
* @param right The right operand.
* @param project_expressions The project expressions.
+ * @param partition_scheme_header The optional output partition scheme header.
*/
BinaryJoin(const PhysicalPtr &left,
const PhysicalPtr &right,
- const std::vector<expressions::NamedExpressionPtr> &project_expressions)
- : Join(project_expressions),
+ const std::vector<expressions::NamedExpressionPtr> &project_expressions,
+ PartitionSchemeHeader *partition_scheme_header = nullptr)
+ : Join(project_expressions, partition_scheme_header),
left_(left),
right_(right) {
addChild(left_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index 1e777b1..e0b1d25 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -67,6 +67,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_BinaryJoin
quickstep_queryoptimizer_OptimizerTree
quickstep_queryoptimizer_expressions_NamedExpression
quickstep_queryoptimizer_physical_Join
+ quickstep_queryoptimizer_physical_PartitionSchemeHeader
quickstep_queryoptimizer_physical_Physical
quickstep_utility_Cast
quickstep_utility_Macros)
@@ -202,9 +203,11 @@ target_link_libraries(quickstep_queryoptimizer_physical_PartitionSchemeHeader
target_link_libraries(quickstep_queryoptimizer_physical_PatternMatcher
quickstep_queryoptimizer_physical_PhysicalType)
target_link_libraries(quickstep_queryoptimizer_physical_Physical
+ glog
quickstep_queryoptimizer_OptimizerTree
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_ExpressionUtil
+ quickstep_queryoptimizer_physical_PartitionSchemeHeader
quickstep_queryoptimizer_physical_PhysicalType
quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_physical_Sample
@@ -225,6 +228,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_Selection
quickstep_queryoptimizer_expressions_LogicalAnd
quickstep_queryoptimizer_expressions_NamedExpression
quickstep_queryoptimizer_expressions_Predicate
+ quickstep_queryoptimizer_physical_PartitionSchemeHeader
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_physical_PhysicalType
quickstep_utility_Cast
@@ -254,6 +258,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_TableGenerator
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_ExprId
quickstep_queryoptimizer_expressions_ExpressionUtil
+ quickstep_queryoptimizer_physical_PartitionSchemeHeader
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_physical_PhysicalType
quickstep_utility_Cast
@@ -261,10 +266,13 @@ target_link_libraries(quickstep_queryoptimizer_physical_TableGenerator
target_link_libraries(quickstep_queryoptimizer_physical_TableReference
glog
quickstep_catalog_CatalogRelation
+ quickstep_catalog_PartitionScheme
+ quickstep_catalog_PartitionSchemeHeader
quickstep_queryoptimizer_OptimizerTree
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_ExpressionUtil
quickstep_queryoptimizer_expressions_NamedExpression
+ quickstep_queryoptimizer_physical_PartitionSchemeHeader
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_physical_PhysicalType
quickstep_utility_Cast
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/HashJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.cpp b/query_optimizer/physical/HashJoin.cpp
index e186072..b0ee913 100644
--- a/query_optimizer/physical/HashJoin.cpp
+++ b/query_optimizer/physical/HashJoin.cpp
@@ -27,6 +27,7 @@
#include "query_optimizer/expressions/ExpressionUtil.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
#include "utility/Cast.hpp"
namespace quickstep {
@@ -79,7 +80,8 @@ bool HashJoin::maybeCopyWithPrunedExpressions(
right_join_attributes_,
residual_predicate_,
new_project_expressions,
- join_type_);
+ join_type_,
+ cloneOutputPartitionSchemeHeader());
return true;
}
return false;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index c513f77..1804d4b 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -49,6 +49,8 @@ namespace physical {
class HashJoin;
typedef std::shared_ptr<const HashJoin> HashJoinPtr;
+struct PartitionSchemeHeader;
+
/**
* @brief Physical hash join node.
*/
@@ -116,11 +118,18 @@ class HashJoin : public BinaryJoin {
right_join_attributes_,
residual_predicate_,
project_expressions(),
- join_type_);
+ join_type_,
+ cloneOutputPartitionSchemeHeader());
}
std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+ PhysicalPtr copyWithNewOutputPartitionSchemeHeader(
+ PartitionSchemeHeader *partition_scheme_header) const override {
+ return Create(left(), right(), left_join_attributes_, right_join_attributes_,
+ residual_predicate_, project_expressions(), join_type_, partition_scheme_header);
+ }
+
bool maybeCopyWithPrunedExpressions(
const expressions::UnorderedNamedExpressionSet &referenced_expressions,
PhysicalPtr *output) const override;
@@ -136,6 +145,8 @@ class HashJoin : public BinaryJoin {
* @param residual_predicate Optional filtering predicate evaluated after join.
* @param project_expressions The project expressions.
* @param Join type of this hash join.
+ * @param partition_scheme_header The optional output partition scheme header.
+ *
* @return An immutable physical HashJoin.
*/
static HashJoinPtr Create(
@@ -145,7 +156,8 @@ class HashJoin : public BinaryJoin {
const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
const expressions::PredicatePtr &residual_predicate,
const std::vector<expressions::NamedExpressionPtr> &project_expressions,
- const JoinType join_type) {
+ const JoinType join_type,
+ PartitionSchemeHeader *partition_scheme_header = nullptr) {
return HashJoinPtr(
new HashJoin(left,
right,
@@ -153,7 +165,8 @@ class HashJoin : public BinaryJoin {
right_join_attributes,
residual_predicate,
project_expressions,
- join_type));
+ join_type,
+ partition_scheme_header));
}
protected:
@@ -173,8 +186,9 @@ class HashJoin : public BinaryJoin {
const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
const expressions::PredicatePtr &residual_predicate,
const std::vector<expressions::NamedExpressionPtr> &project_expressions,
- const JoinType join_type)
- : BinaryJoin(left, right, project_expressions),
+ const JoinType join_type,
+ PartitionSchemeHeader *partition_scheme_header)
+ : BinaryJoin(left, right, project_expressions, partition_scheme_header),
left_join_attributes_(left_join_attributes),
right_join_attributes_(right_join_attributes),
residual_predicate_(residual_predicate),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Join.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Join.hpp b/query_optimizer/physical/Join.hpp
index 305aa52..d72d072 100644
--- a/query_optimizer/physical/Join.hpp
+++ b/query_optimizer/physical/Join.hpp
@@ -40,6 +40,8 @@ namespace physical {
class Join;
typedef std::shared_ptr<const Join> JoinPtr;
+struct PartitionSchemeHeader;
+
/**
* @brief Base class for physical join nodes.
*/
@@ -68,10 +70,13 @@ class Join : public Physical {
* @brief Constructor.
*
* @param project_expressions The project expressions.
+ * @param partition_scheme_header The optional output partition scheme header.
*/
explicit Join(
- const std::vector<expressions::NamedExpressionPtr>& project_expressions)
- : project_expressions_(project_expressions) {}
+ const std::vector<expressions::NamedExpressionPtr>& project_expressions,
+ PartitionSchemeHeader *partition_scheme_header = nullptr)
+ : Physical(partition_scheme_header),
+ project_expressions_(project_expressions) {}
private:
std::vector<expressions::NamedExpressionPtr> project_expressions_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Physical.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Physical.hpp b/query_optimizer/physical/Physical.hpp
index 4bed593..2279a84 100644
--- a/query_optimizer/physical/Physical.hpp
+++ b/query_optimizer/physical/Physical.hpp
@@ -26,10 +26,12 @@
#include "query_optimizer/OptimizerTree.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
-
#include "utility/Macros.hpp"
+#include "glog/logging.h"
+
namespace quickstep {
namespace optimizer {
namespace physical {
@@ -86,11 +88,57 @@ class Physical : public OptimizerTree<Physical> {
const expressions::UnorderedNamedExpressionSet &referenced_expressions,
PhysicalPtr *output) const = 0;
+ /**
+ * @brief Creates a copy with the partition scheme header replaced by \p
+ * partition_scheme_header.
+ *
+ * @param partition_scheme_header The partition scheme header to be
+ * substituted for the existing one, if any. It takes ownership of
+ * 'partition_scheme_header'.
+ *
+ * @return A copy with \p partition_scheme_header as the partition scheme
+ * header.
+ */
+ virtual PhysicalPtr copyWithNewOutputPartitionSchemeHeader(
+ PartitionSchemeHeader *partition_scheme_header) const {
+ std::unique_ptr<PartitionSchemeHeader> new_partition_scheme_header(partition_scheme_header);
+ LOG(FATAL) << "copyWithNewOutputPartitionSchemeHeader is not implemented for " << getName();
+ }
+
+ /**
+ * @brief Get the partition scheme of the physical plan node.
+ *
+ * @return A const pointer to the partition scheme of the node.
+ **/
+ const PartitionSchemeHeader* getOutputPartitionSchemeHeader() const {
+ return partition_scheme_header_.get();
+ }
+
protected:
/**
* @brief Constructor.
+ *
+ * @param partition_scheme_header The partition scheme header of the relation.
+ * The constructor takes ownership of 'partition_scheme_header'.
*/
- Physical() {}
+ explicit Physical(PartitionSchemeHeader *partition_scheme_header = nullptr)
+ : partition_scheme_header_(partition_scheme_header) {}
+
+ /**
+ * @brief Clone a copy of the partition scheme header.
+ *
+ * @return A copy of the partition scheme header. Caller should take ownership
+ * of the returned object.
+ **/
+ PartitionSchemeHeader* cloneOutputPartitionSchemeHeader() const {
+ if (partition_scheme_header_) {
+ return new PartitionSchemeHeader(*partition_scheme_header_);
+ }
+
+ return nullptr;
+ }
+
+ std::unique_ptr<PartitionSchemeHeader> partition_scheme_header_;
private:
DISALLOW_COPY_AND_ASSIGN(Physical);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Selection.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.cpp b/query_optimizer/physical/Selection.cpp
index 36ade04..e2fa69e 100644
--- a/query_optimizer/physical/Selection.cpp
+++ b/query_optimizer/physical/Selection.cpp
@@ -19,27 +19,58 @@
#include "query_optimizer/physical/Selection.hpp"
+#include <memory>
#include <string>
+#include <unordered_set>
#include <vector>
#include "query_optimizer/OptimizerTree.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ExpressionUtil.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
+#include "query_optimizer/physical/Physical.hpp"
#include "utility/Cast.hpp"
#include "glog/logging.h"
+using std::unordered_set;
+
namespace quickstep {
namespace optimizer {
namespace physical {
namespace E = ::quickstep::optimizer::expressions;
+SelectionPtr Selection::Create(
+ const PhysicalPtr &input,
+ const std::vector<E::NamedExpressionPtr> &project_expressions,
+ const E::PredicatePtr &filter_predicate,
+ PartitionSchemeHeader *output_partition_scheme_header) {
+ std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(output_partition_scheme_header);
+
+ if (!partition_scheme_header) {
+ const PartitionSchemeHeader *input_partition_scheme_header = input->getOutputPartitionSchemeHeader();
+ if (input_partition_scheme_header) {
+ unordered_set<E::ExprId> project_expr_ids;
+ for (const E::NamedExpressionPtr &project_expression : project_expressions) {
+ project_expr_ids.insert(project_expression->id());
+ }
+
+ if (input_partition_scheme_header->reusablePartitionScheme(project_expr_ids)) {
+ partition_scheme_header = std::make_unique<PartitionSchemeHeader>(*input_partition_scheme_header);
+ }
+ }
+ }
+
+ return SelectionPtr(
+ new Selection(input, project_expressions, filter_predicate, partition_scheme_header.release()));
+}
+
PhysicalPtr Selection::copyWithNewChildren(
const std::vector<PhysicalPtr> &new_children) const {
DCHECK_EQ(children().size(), new_children.size());
- return Create(new_children[0], project_expressions_, filter_predicate_);
+ return Create(new_children[0], project_expressions_, filter_predicate_, cloneOutputPartitionSchemeHeader());
}
std::vector<E::AttributeReferencePtr> Selection::getOutputAttributes() const {
@@ -76,7 +107,7 @@ bool Selection::maybeCopyWithPrunedExpressions(
}
}
if (new_project_expressions.size() != project_expressions_.size()) {
- *output = Create(input(), new_project_expressions, filter_predicate_);
+ *output = Create(input(), new_project_expressions, filter_predicate_, cloneOutputPartitionSchemeHeader());
return true;
}
return false;
@@ -89,6 +120,11 @@ void Selection::getFieldStringItems(
std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
std::vector<std::string> *container_child_field_names,
std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+ if (partition_scheme_header_) {
+ inline_field_names->push_back("output_partition_scheme_header");
+ inline_field_values->push_back(partition_scheme_header_->toString());
+ }
+
non_container_child_field_names->emplace_back("input");
non_container_child_fields->emplace_back(input());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Selection.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.hpp b/query_optimizer/physical/Selection.hpp
index b6874a1..204eb2f 100644
--- a/query_optimizer/physical/Selection.hpp
+++ b/query_optimizer/physical/Selection.hpp
@@ -42,6 +42,8 @@ namespace physical {
* @{
*/
+struct PartitionSchemeHeader;
+
class Selection;
typedef std::shared_ptr<const Selection> SelectionPtr;
@@ -82,6 +84,11 @@ class Selection : public Physical {
std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+ PhysicalPtr copyWithNewOutputPartitionSchemeHeader(
+ PartitionSchemeHeader *partition_scheme_header) const override {
+ return Create(input(), project_expressions_, filter_predicate_, partition_scheme_header);
+ }
+
bool maybeCopyWithPrunedExpressions(
const expressions::UnorderedNamedExpressionSet &referenced_attributes,
PhysicalPtr *output) const override;
@@ -92,15 +99,17 @@ class Selection : public Physical {
* @param input The input node.
* @param project_expressions The project expressions.
* @param filter_predicate The filter predicate. Can be NULL.
+ * @param output_partition_scheme_header The partition scheme header that
+ * overwrites that from input, if not NULL. It takes ownership of
+ * 'output_partition_scheme_header'.
+ *
* @return An immutable Selection.
*/
static SelectionPtr Create(
const PhysicalPtr &input,
const std::vector<expressions::NamedExpressionPtr> &project_expressions,
- const expressions::PredicatePtr &filter_predicate) {
- return SelectionPtr(
- new Selection(input, project_expressions, filter_predicate));
- }
+ const expressions::PredicatePtr &filter_predicate,
+ PartitionSchemeHeader *output_partition_scheme_header = nullptr);
/**
* @brief Creates a conjunctive predicate with \p filter_predicates
@@ -140,15 +149,17 @@ class Selection : public Physical {
Selection(
const PhysicalPtr &input,
const std::vector<expressions::NamedExpressionPtr> &project_expressions,
- const expressions::PredicatePtr &filter_predicate)
- : project_expressions_(project_expressions),
+ const expressions::PredicatePtr &filter_predicate,
+ PartitionSchemeHeader *partition_scheme_header)
+ : Physical(partition_scheme_header),
+ project_expressions_(project_expressions),
filter_predicate_(filter_predicate) {
addChild(input);
}
- std::vector<expressions::NamedExpressionPtr> project_expressions_;
+ const std::vector<expressions::NamedExpressionPtr> project_expressions_;
// Can be NULL. If NULL, the filter predicate is treated as the literal true.
- expressions::PredicatePtr filter_predicate_;
+ const expressions::PredicatePtr filter_predicate_;
DISALLOW_COPY_AND_ASSIGN(Selection);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/TableGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TableGenerator.hpp b/query_optimizer/physical/TableGenerator.hpp
index c9ff8a8..4d6419f 100644
--- a/query_optimizer/physical/TableGenerator.hpp
+++ b/query_optimizer/physical/TableGenerator.hpp
@@ -29,6 +29,7 @@
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
#include "utility/Cast.hpp"
@@ -100,6 +101,12 @@ class TableGenerator : public Physical {
return {};
}
+ PhysicalPtr copyWithNewOutputPartitionSchemeHeader(
+ PartitionSchemeHeader *partition_scheme_header) const override {
+ return TableGeneratorPtr(
+ new TableGenerator(generator_function_handle_, table_alias_, attribute_list_, partition_scheme_header));
+ }
+
void getFieldStringItems(
std::vector<std::string> *inline_field_names,
std::vector<std::string> *inline_field_values,
@@ -116,6 +123,11 @@ class TableGenerator : public Physical {
inline_field_values->push_back(table_alias_);
}
+ if (partition_scheme_header_) {
+ inline_field_names->push_back("output_partition_scheme_header");
+ inline_field_values->push_back(partition_scheme_header_->toString());
+ }
+
container_child_field_names->push_back("");
container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(attribute_list_));
}
@@ -139,8 +151,10 @@ class TableGenerator : public Physical {
private:
TableGenerator(const GeneratorFunctionHandlePtr &generator_function_handle,
const std::string &table_alias,
- const std::vector<E::AttributeReferencePtr> &attribute_list)
- : generator_function_handle_(generator_function_handle),
+ const std::vector<E::AttributeReferencePtr> &attribute_list,
+ PartitionSchemeHeader *partition_scheme_header = nullptr)
+ : Physical(partition_scheme_header),
+ generator_function_handle_(generator_function_handle),
table_alias_(table_alias),
attribute_list_(attribute_list) {
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/TableReference.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TableReference.cpp b/query_optimizer/physical/TableReference.cpp
index bfd6464..cd6bba4 100644
--- a/query_optimizer/physical/TableReference.cpp
+++ b/query_optimizer/physical/TableReference.cpp
@@ -19,11 +19,17 @@
#include "query_optimizer/physical/TableReference.hpp"
+#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "catalog/CatalogRelation.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
+#include "query_optimizer/physical/Physical.hpp"
#include "utility/Cast.hpp"
namespace quickstep {
@@ -32,6 +38,50 @@ namespace physical {
namespace E = ::quickstep::optimizer::expressions;
+TableReferencePtr TableReference::Create(
+ const CatalogRelation *relation,
+ const std::string &alias,
+ const std::vector<expressions::AttributeReferencePtr> &attribute_list) {
+ std::unique_ptr<PartitionSchemeHeader> output_partition_scheme_header;
+ const quickstep::PartitionScheme *partition_scheme = relation->getPartitionScheme();
+ if (partition_scheme) {
+ const quickstep::PartitionSchemeHeader &partition_scheme_header = partition_scheme->getPartitionSchemeHeader();
+
+ PartitionSchemeHeader::PartitionType physical_partition_type;
+ switch (partition_scheme_header.getPartitionType()) {
+ case quickstep::PartitionSchemeHeader::PartitionType::kHash:
+ physical_partition_type = PartitionSchemeHeader::PartitionType::kHash;
+ break;
+ case quickstep::PartitionSchemeHeader::PartitionType::kRange:
+ physical_partition_type = PartitionSchemeHeader::PartitionType::kRange;
+ break;
+ default:
+ return nullptr;
+ }
+
+ PartitionSchemeHeader::PartitionExprIds partition_expr_ids;
+ for (const attribute_id part_attr : partition_scheme_header.getPartitionAttributeIds()) {
+ partition_expr_ids.push_back({ attribute_list[part_attr]->id() });
+ }
+
+ output_partition_scheme_header =
+ std::make_unique<PartitionSchemeHeader>(physical_partition_type,
+ partition_scheme_header.getNumPartitions(),
+ std::move(partition_expr_ids));
+ }
+
+ return TableReferencePtr(new TableReference(relation, alias, attribute_list,
+ output_partition_scheme_header.release()));
+}
+
+PhysicalPtr TableReference::copyWithNewChildren(
+ const std::vector<PhysicalPtr> &new_children) const {
+ DCHECK_EQ(new_children.size(), children().size());
+ std::unique_ptr<PartitionSchemeHeader> output_partition_scheme_header;
+ return TableReferencePtr(new TableReference(relation_, alias_, attribute_list_,
+ cloneOutputPartitionSchemeHeader()));
+}
+
void TableReference::getFieldStringItems(
std::vector<std::string> *inline_field_names,
std::vector<std::string> *inline_field_values,
@@ -47,6 +97,11 @@ void TableReference::getFieldStringItems(
inline_field_values->push_back(alias_);
}
+ if (partition_scheme_header_) {
+ inline_field_names->push_back("output_partition_scheme_header");
+ inline_field_values->push_back(partition_scheme_header_->toString());
+ }
+
container_child_field_names->push_back("");
container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(attribute_list_));
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/TableReference.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TableReference.hpp b/query_optimizer/physical/TableReference.hpp
index 638d73b..bdc0578 100644
--- a/query_optimizer/physical/TableReference.hpp
+++ b/query_optimizer/physical/TableReference.hpp
@@ -20,7 +20,6 @@
#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_TABLE_REFERENCE_HPP_
#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_TABLE_REFERENCE_HPP_
-#include <memory>
#include <string>
#include <vector>
@@ -45,6 +44,7 @@ namespace physical {
* @{
*/
+struct PartitionSchemeHeader;
class TableReference;
typedef std::shared_ptr<const TableReference> TableReferencePtr;
@@ -70,10 +70,7 @@ class TableReference : public Physical {
}
PhysicalPtr copyWithNewChildren(
- const std::vector<PhysicalPtr> &new_children) const override {
- DCHECK_EQ(new_children.size(), children().size());
- return TableReferencePtr(new TableReference(relation_, alias_, attribute_list_));
- }
+ const std::vector<PhysicalPtr> &new_children) const override;
std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {
return attribute_list_;
@@ -101,9 +98,7 @@ class TableReference : public Physical {
static TableReferencePtr Create(
const CatalogRelation *relation,
const std::string &alias,
- const std::vector<expressions::AttributeReferencePtr> &attribute_list) {
- return TableReferencePtr(new TableReference(relation, alias, attribute_list));
- }
+ const std::vector<expressions::AttributeReferencePtr> &attribute_list);
protected:
void getFieldStringItems(
@@ -117,14 +112,16 @@ class TableReference : public Physical {
private:
TableReference(
const CatalogRelation *relation, const std::string &alias,
- const std::vector<expressions::AttributeReferencePtr> &attribute_list)
- : relation_(relation),
+ const std::vector<expressions::AttributeReferencePtr> &attribute_list,
+ PartitionSchemeHeader *partition_scheme_header)
+ : Physical(partition_scheme_header),
+ relation_(relation),
alias_(alias),
attribute_list_(attribute_list) {}
const CatalogRelation *relation_;
- std::string alias_;
- std::vector<expressions::AttributeReferencePtr> attribute_list_;
+ const std::string alias_;
+ const std::vector<expressions::AttributeReferencePtr> attribute_list_;
DISALLOW_COPY_AND_ASSIGN(TableReference);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 70302be..1ad8f40 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -391,5 +391,5 @@ target_link_libraries(quickstep_queryoptimizer_rules
quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
quickstep_queryoptimizer_rules_SwapProbeBuild
quickstep_queryoptimizer_rules_TopDownRule
- quickstep_queryoptimizer_rules_UpdateExpression
- quickstep_queryoptimizer_rules_UnnestSubqueries)
+ quickstep_queryoptimizer_rules_UnnestSubqueries
+ quickstep_queryoptimizer_rules_UpdateExpression)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 70bb185..d3e7e08 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -463,6 +463,8 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const
void HashInnerJoinWorkOrder::execute() {
+ output_destination_->setInputPartitionId(part_id_);
+
BlockReference probe_block(
storage_manager_->getBlock(block_id_, probe_relation_));
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
@@ -694,6 +696,8 @@ void HashInnerJoinWorkOrder::executeWithCopyElision(ValueAccessor *probe_accesso
}
void HashSemiJoinWorkOrder::execute() {
+ output_destination_->setInputPartitionId(part_id_);
+
if (residual_predicate_ == nullptr) {
executeWithoutResidualPredicate();
} else {
@@ -1018,6 +1022,8 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
}
void HashOuterJoinWorkOrder::execute() {
+ output_destination_->setInputPartitionId(part_id_);
+
const relation_id build_relation_id = build_relation_.getID();
const relation_id probe_relation_id = probe_relation_.getID();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 6391847..23267f8 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -35,6 +35,7 @@
#include "relational_operators/WorkOrder.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/HashTable.hpp"
+#include "storage/InsertDestination.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
@@ -48,7 +49,6 @@ namespace tmb { class MessageBus; }
namespace quickstep {
class CatalogRelationSchema;
-class InsertDestination;
class Predicate;
class Scalar;
class StorageManager;
@@ -712,6 +712,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
~HashAntiJoinWorkOrder() override {}
void execute() override {
+ output_destination_->setInputPartitionId(part_id_);
+
if (residual_predicate_ == nullptr) {
executeWithoutResidualPredicate();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index 935b104..845c563 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -75,7 +75,7 @@ bool SelectOperator::getAllWorkOrders(
}
#endif // QUICKSTEP_HAVE_LIBNUMA
container->addNormalWorkOrder(
- new SelectWorkOrder(query_id_, input_relation_, input_block_id, predicate, simple_projection_,
+ new SelectWorkOrder(query_id_, input_relation_, part_id, input_block_id, predicate, simple_projection_,
simple_selection_, selection, output_destination, storage_manager,
CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node),
op_index_);
@@ -95,7 +95,7 @@ bool SelectOperator::getAllWorkOrders(
}
#endif // QUICKSTEP_HAVE_LIBNUMA
container->addNormalWorkOrder(
- new SelectWorkOrder(query_id_, input_relation_, block, predicate, simple_projection_,
+ new SelectWorkOrder(query_id_, input_relation_, part_id, block, predicate, simple_projection_,
simple_selection_, selection, output_destination, storage_manager,
CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node),
op_index_);
@@ -114,7 +114,7 @@ bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container)
for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
- container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+ container->addWorkOrderProto(createWorkOrderProto(part_id, input_block_id), op_index_);
}
}
started_ = true;
@@ -123,7 +123,7 @@ bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container)
for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
container->addWorkOrderProto(
- createWorkOrderProto(input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]),
+ createWorkOrderProto(part_id, input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]),
op_index_);
++num_workorders_generated_[part_id];
}
@@ -132,7 +132,7 @@ bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container)
}
}
-serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id block) {
+serialization::WorkOrder* SelectOperator::createWorkOrderProto(const partition_id part_id, const block_id block) {
serialization::WorkOrder *proto = new serialization::WorkOrder;
proto->set_work_order_type(serialization::SELECT);
proto->set_query_id(query_id_);
@@ -140,6 +140,7 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl
proto->SetExtension(serialization::SelectWorkOrder::relation_id, input_relation_.getID());
proto->SetExtension(serialization::SelectWorkOrder::insert_destination_index, output_destination_index_);
proto->SetExtension(serialization::SelectWorkOrder::predicate_index, predicate_index_);
+ proto->SetExtension(serialization::SelectWorkOrder::partition_id, part_id);
proto->SetExtension(serialization::SelectWorkOrder::block_id, block);
proto->SetExtension(serialization::SelectWorkOrder::simple_projection, simple_projection_);
if (simple_projection_) {
@@ -158,6 +159,8 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl
}
void SelectWorkOrder::execute() {
+ output_destination_->setInputPartitionId(part_id_);
+
BlockReference block(
storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0]));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index b8161b7..c11d681 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -226,9 +226,10 @@ class SelectOperator : public RelationalOperator {
/**
* @brief Create Work Order proto.
*
+ * @param part_id The partition id.
* @param block The block id used in the Work Order.
**/
- serialization::WorkOrder* createWorkOrderProto(const block_id block);
+ serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, const block_id block);
const CatalogRelation &input_relation_;
const CatalogRelation &output_relation_;
@@ -268,6 +269,7 @@ class SelectWorkOrder : public WorkOrder {
*
* @param query_id The ID of the query to which this WorkOrder belongs.
* @param input_relation The relation to perform selection over.
+ * @param part_id The partition id.
* @param input_block_id The block id.
* @param predicate All tuples matching \c predicate will be selected (or NULL
* to select all tuples).
@@ -283,6 +285,7 @@ class SelectWorkOrder : public WorkOrder {
**/
SelectWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
+ const partition_id part_id,
const block_id input_block_id,
const Predicate *predicate,
const bool simple_projection,
@@ -294,6 +297,7 @@ class SelectWorkOrder : public WorkOrder {
const numa_node_id numa_node = 0)
: WorkOrder(query_id),
input_relation_(input_relation),
+ part_id_(part_id),
input_block_id_(input_block_id),
predicate_(predicate),
simple_projection_(simple_projection),
@@ -313,6 +317,7 @@ class SelectWorkOrder : public WorkOrder {
*
* @param query_id The ID of the query to which this WorkOrder belongs.
* @param input_relation The relation to perform selection over.
+ * @param part_id The partition id.
* @param input_block_id The block id.
* @param predicate All tuples matching \c predicate will be selected (or NULL
* to select all tuples).
@@ -328,6 +333,7 @@ class SelectWorkOrder : public WorkOrder {
**/
SelectWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
+ const partition_id part_id,
const block_id input_block_id,
const Predicate *predicate,
const bool simple_projection,
@@ -339,6 +345,7 @@ class SelectWorkOrder : public WorkOrder {
const numa_node_id numa_node = 0)
: WorkOrder(query_id),
input_relation_(input_relation),
+ part_id_(part_id),
input_block_id_(input_block_id),
predicate_(predicate),
simple_projection_(simple_projection),
@@ -364,6 +371,7 @@ class SelectWorkOrder : public WorkOrder {
private:
const CatalogRelationSchema &input_relation_;
+ const partition_id part_id_;
const block_id input_block_id_;
const Predicate *predicate_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 18f0589..42a0e7d 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -238,12 +238,14 @@ message SaveBlocksWorkOrder {
}
}
+// Next tag: 250.
message SelectWorkOrder {
extend WorkOrder {
// All required.
optional int32 relation_id = 240;
optional int32 insert_destination_index = 241;
optional int32 predicate_index = 242;
+ optional uint64 partition_id = 249;
optional fixed64 block_id = 243;
optional bool simple_projection = 244;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 48bf956..2b0bae4 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -434,7 +434,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
storage_manager);
}
case serialization::SELECT: {
- LOG(INFO) << "Creating SelectWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
+ const partition_id part_id =
+ proto.GetExtension(serialization::SelectWorkOrder::partition_id);
+
+ LOG(INFO) << "Creating SelectWorkOrder (Partition " << part_id << ") for Query " << query_id
+ << " in Shiftboss " << shiftboss_index;
+
const bool simple_projection =
proto.GetExtension(serialization::SelectWorkOrder::simple_projection);
vector<attribute_id> simple_selection;
@@ -447,6 +452,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
query_id,
catalog_database->getRelationSchemaById(
proto.GetExtension(serialization::SelectWorkOrder::relation_id)),
+ part_id,
proto.GetExtension(serialization::SelectWorkOrder::block_id),
query_context->getPredicate(
proto.GetExtension(serialization::SelectWorkOrder::predicate_index)),
@@ -913,6 +919,7 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
proto.HasExtension(serialization::SelectWorkOrder::predicate_index) &&
query_context.isValidPredicate(
proto.GetExtension(serialization::SelectWorkOrder::predicate_index)) &&
+ proto.HasExtension(serialization::SelectWorkOrder::partition_id) &&
proto.HasExtension(serialization::SelectWorkOrder::block_id);
}
case serialization::SORT_MERGE_RUN: {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 4296ba0..f33a4f4 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -776,6 +776,7 @@ target_link_libraries(quickstep_storage_InsertDestination
quickstep_storage_StorageBlockLayout
quickstep_storage_StorageManager
quickstep_storage_TupleIdSequence
+ quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
quickstep_threading_SpinMutex
quickstep_threading_ThreadIDBasedMap
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index aeef08a..3caa80f 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -40,6 +40,7 @@
#include "storage/StorageBlockLayout.hpp"
#include "storage/StorageManager.hpp"
#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "threading/SpinMutex.hpp"
#include "threading/ThreadIDBasedMap.hpp"
@@ -573,11 +574,7 @@ PartitionSchemeHeader::PartitionAttributeIds PartitionAwareInsertDestination::ge
}
void PartitionAwareInsertDestination::insertTuple(const Tuple &tuple) {
- PartitionSchemeHeader::PartitionValues values;
- for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
- values.push_back(tuple.getAttributeValue(attr_id));
- }
- const partition_id part_id = partition_scheme_header_->getPartitionId(values);
+ const partition_id part_id = getPartitionId(tuple);
MutableBlockReference output_block = getBlockForInsertionInPartition(part_id);
@@ -596,11 +593,7 @@ void PartitionAwareInsertDestination::insertTuple(const Tuple &tuple) {
}
void PartitionAwareInsertDestination::insertTupleInBatch(const Tuple &tuple) {
- PartitionSchemeHeader::PartitionValues values;
- for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
- values.push_back(tuple.getAttributeValue(attr_id));
- }
- const partition_id part_id = partition_scheme_header_->getPartitionId(values);
+ const partition_id part_id = getPartitionId(tuple);
MutableBlockReference output_block = getBlockForInsertionInPartition(part_id);
@@ -637,12 +630,7 @@ void PartitionAwareInsertDestination::bulkInsertTuples(ValueAccessor *accessor,
// set a bit in the appropriate TupleIdSequence.
accessor->beginIteration();
while (accessor->next()) {
- PartitionSchemeHeader::PartitionValues values;
- for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
- values.push_back(accessor->getTypedValue(attr_id));
- }
- partition_membership[partition_scheme_header_->getPartitionId(values)]
- ->set(accessor->getCurrentPosition(), true);
+ partition_membership[this->getPartitionId(accessor)]->set(accessor->getCurrentPosition(), true);
}
// For each partition, create an adapter around Value Accessor and
@@ -693,12 +681,7 @@ void PartitionAwareInsertDestination::bulkInsertTuplesWithRemappedAttributes(
// set a bit in the appropriate TupleIdSequence.
accessor->beginIteration();
while (accessor->next()) {
- PartitionSchemeHeader::PartitionValues values;
- for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
- values.push_back(accessor->getTypedValue(attr_id));
- }
- partition_membership[partition_scheme_header_->getPartitionId(values)]
- ->set(accessor->getCurrentPosition(), true);
+ partition_membership[this->getPartitionId(accessor)]->set(accessor->getCurrentPosition(), true);
}
// For each partition, create an adapter around Value Accessor and
@@ -735,12 +718,7 @@ void PartitionAwareInsertDestination::insertTuplesFromVector(std::vector<Tuple>:
}
for (; begin != end; ++begin) {
- PartitionSchemeHeader::PartitionValues values;
- for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
- values.push_back(begin->getAttributeValue(attr_id));
- }
-
- const partition_id part_id = partition_scheme_header_->getPartitionId(values);
+ const partition_id part_id = getPartitionId(*begin);
MutableBlockReference dest_block = getBlockForInsertionInPartition(part_id);
// FIXME(chasseur): Deal with TupleTooLargeForBlock exception.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index db58116..878ebb4 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -36,6 +36,7 @@
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageBlockLayout.hpp"
+#include "storage/ValueAccessor.hpp"
#include "threading/SpinMutex.hpp"
#include "threading/ThreadIDBasedMap.hpp"
#include "types/containers/Tuple.hpp"
@@ -53,7 +54,6 @@ namespace tmb { class MessageBus; }
namespace quickstep {
class StorageManager;
-class ValueAccessor;
namespace merge_run_operator {
class RunCreator;
@@ -201,6 +201,14 @@ class InsertDestination : public InsertDestinationInterface {
virtual void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
std::vector<partition_id> *part_ids) = 0;
+ /**
+ * @brief Set the input partition id. Used when the partition attributes are
+ * empty.
+ *
+ * @param input_partition_id The input partition id.
+ **/
+ virtual void setInputPartitionId(const partition_id input_partition_id) {}
+
protected:
/**
* @brief Get a block to use for insertion.
@@ -541,6 +549,10 @@ class PartitionAwareInsertDestination : public InsertDestination {
void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
std::vector<Tuple>::const_iterator end) override;
+ void setInputPartitionId(const partition_id input_partition_id) override {
+ input_partition_id_ = input_partition_id;
+ }
+
protected:
MutableBlockReference getBlockForInsertion() override {
LOG(FATAL) << "PartitionAwareInsertDestination::getBlockForInsertion needs a partition id as an argument.";
@@ -599,6 +611,24 @@ class PartitionAwareInsertDestination : public InsertDestination {
available_block_refs_[part_id].clear();
}
+ partition_id getPartitionId(const Tuple &tuple) const {
+ PartitionSchemeHeader::PartitionValues values;
+ for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
+ values.push_back(tuple.getAttributeValue(attr_id));
+ }
+
+ return values.empty() ? input_partition_id_ : partition_scheme_header_->getPartitionId(values);
+ }
+
+ partition_id getPartitionId(ValueAccessor *accessor) const {
+ PartitionSchemeHeader::PartitionValues values;
+ for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
+ values.push_back(accessor->getTypedValueVirtual(attr_id));
+ }
+
+ return values.empty() ? input_partition_id_ : partition_scheme_header_->getPartitionId(values);
+ }
+
std::unique_ptr<const PartitionSchemeHeader> partition_scheme_header_;
// A vector of available block references for each partition.
@@ -612,6 +642,8 @@ class PartitionAwareInsertDestination : public InsertDestination {
// Mutex for locking each partition separately.
SpinMutex *mutexes_for_partition_;
+ partition_id input_partition_id_ = 0u;
+
DISALLOW_COPY_AND_ASSIGN(PartitionAwareInsertDestination);
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index e1fb770..16a83ee 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -284,6 +284,7 @@ target_link_libraries(quickstep_utility_PlanVisualizer
quickstep_queryoptimizer_physical_FilterJoin
quickstep_queryoptimizer_physical_HashJoin
quickstep_queryoptimizer_physical_LIPFilterConfiguration
+ quickstep_queryoptimizer_physical_PartitionSchemeHeader
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_physical_PhysicalType
quickstep_queryoptimizer_physical_TableReference
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index f8bf6f8..98de011 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -34,6 +34,7 @@
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/physical/FilterJoin.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
#include "query_optimizer/physical/TableReference.hpp"
@@ -189,6 +190,11 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
}
}
+ const P::PartitionSchemeHeader *partition_scheme_header = input->getOutputPartitionSchemeHeader();
+ if (partition_scheme_header) {
+ node_info.labels.emplace_back(partition_scheme_header->toString());
+ }
+
if (lip_filter_conf_ != nullptr) {
const auto &build_filters = lip_filter_conf_->getBuildInfoMap();
const auto build_it = build_filters.find(input);