You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/05 22:04:10 UTC
[72/72] incubator-quickstep git commit: Added limited optimizer
support for Partitioned Hash Joins.
Added limited optimizer support for Partitioned Hash Joins.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5ce6a20b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5ce6a20b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5ce6a20b
Branch: refs/heads/reorder-partitioned-hash-join
Commit: 5ce6a20b7bcc2c0e363cc85bf9ec8016dca2eaf9
Parents: 27a8055
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Jan 25 01:49:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Feb 5 14:02:37 2017 -0800
----------------------------------------------------------------------
query_execution/QueryContext.cpp | 2 +
query_optimizer/ExecutionGenerator.cpp | 372 ++++++++++++++++---
query_optimizer/ExecutionGenerator.hpp | 7 +-
.../tests/execution_generator/Partition.test | 71 +++-
types/TypedValue.hpp | 19 +
5 files changed, 414 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 71839a7..3681a3b 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -54,6 +54,8 @@ using std::vector;
namespace quickstep {
+constexpr QueryContext::insert_destination_id QueryContext::kInvalidInsertDestinationId;
+
QueryContext::QueryContext(const serialization::QueryContext &proto,
const CatalogDatabaseLite &database,
StorageManager *storage_manager,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 6918313..828d21f 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -134,8 +134,12 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+using std::find;
+using std::make_unique;
using std::move;
+using std::size_t;
using std::static_pointer_cast;
+using std::swap;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
@@ -159,6 +163,8 @@ static const volatile bool aggregate_hashtable_type_dummy
DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
+DEFINE_uint64(num_repartitions, 4, "Number of repartitions for a hash join.");
+
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;
namespace S = ::quickstep::serialization;
@@ -421,7 +427,8 @@ void ExecutionGenerator::convertTableReference(
std::piecewise_construct,
std::forward_as_tuple(physical_table_reference),
std::forward_as_tuple(CatalogRelationInfo::kInvalidOperatorIndex,
- catalog_relation));
+ catalog_relation,
+ QueryContext::kInvalidInsertDestinationId));
}
void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) {
@@ -461,8 +468,9 @@ void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) {
std::piecewise_construct,
std::forward_as_tuple(physical_sample),
std::forward_as_tuple(sample_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(sample_index, output_relation);
+ output_relation,
+ insert_destination_index));
+ temporary_relation_info_vec_.emplace_back(sample_index, output_relation, insert_destination_index);
}
bool ExecutionGenerator::convertSimpleProjection(
@@ -595,8 +603,9 @@ void ExecutionGenerator::convertSelection(
std::piecewise_construct,
std::forward_as_tuple(physical_selection),
std::forward_as_tuple(select_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(select_index, output_relation);
+ output_relation,
+ insert_destination_index));
+ temporary_relation_info_vec_.emplace_back(select_index, output_relation, insert_destination_index);
if (lip_filter_generator_ != nullptr) {
lip_filter_generator_->addSelectionInfo(physical_selection, select_index);
@@ -679,6 +688,64 @@ void ExecutionGenerator::convertFilterJoin(const P::FilterJoinPtr &physical_plan
build_filter_operator_index);
}
+namespace {
+
+bool areSamePartitionSchemeHeaders(const PartitionSchemeHeader &lhs_partition_header,
+ const CatalogRelationSchema &lhs_scheme,
+ const PartitionSchemeHeader &rhs_partition_header,
+ const CatalogRelationSchema &rhs_scheme) {
+ if (lhs_partition_header.getPartitionType() != rhs_partition_header.getPartitionType()) {
+ return false;
+ }
+
+ if (lhs_partition_header.getNumPartitions() != rhs_partition_header.getNumPartitions()) {
+ return false;
+ }
+
+ // Check whether the underlying types in CatalogAttribute are the same.
+ if (!lhs_scheme.getAttributeById(lhs_partition_header.getPartitionAttributeId())->getType().equals(
+ rhs_scheme.getAttributeById(rhs_partition_header.getPartitionAttributeId())->getType())) {
+ return false;
+ }
+
+ switch (lhs_partition_header.getPartitionType()) {
+ case PartitionSchemeHeader::PartitionType::kHash:
+ return true;
+ case PartitionSchemeHeader::PartitionType::kRange: {
+ const vector<TypedValue> &lhs_ranges =
+ static_cast<const RangePartitionSchemeHeader&>(lhs_partition_header).getPartitionRangeBoundaries();
+ const vector<TypedValue> &rhs_ranges =
+ static_cast<const RangePartitionSchemeHeader&>(rhs_partition_header).getPartitionRangeBoundaries();
+
+ return lhs_ranges == rhs_ranges;
+ }
+ }
+
+ return false;
+}
+
+
+// Note that this method will be deprecated once the partition scheme header
+// supports multiple partition attributes.
+size_t chooseBestRepartitionAttributeIndex(const CatalogRelationStatistics &stats,
+ const vector<attribute_id> &join_attributes) {
+ size_t chose_attr_index = static_cast<size_t>(-1);
+ size_t chose_attr_num_distinct_values = 0;
+
+ for (std::size_t i = 0; i < join_attributes.size(); ++i) {
+ const attribute_id attr = join_attributes[i];
+ if (stats.hasNumDistinctValues(attr) &&
+ stats.getNumDistinctValues(attr) > chose_attr_num_distinct_values) {
+ chose_attr_index = i;
+ chose_attr_num_distinct_values = stats.getNumDistinctValues(attr);
+ }
+ }
+
+ return (chose_attr_index != static_cast<size_t>(-1)) ? chose_attr_index : 0;
+}
+
+} // namespace
+
void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
// HashJoin is converted to three operators:
// BuildHash, HashJoin, DestroyHash. The second is the primary operator.
@@ -689,13 +756,10 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
std::vector<attribute_id> probe_attribute_ids;
std::vector<attribute_id> build_attribute_ids;
- std::size_t build_cardinality =
- cost_model_for_hash_join_->estimateCardinality(build_physical);
-
bool any_probe_attributes_nullable = false;
bool any_build_attributes_nullable = false;
- const std::vector<E::AttributeReferencePtr> &left_join_attributes =
+ std::vector<E::AttributeReferencePtr> left_join_attributes =
physical_plan->left_join_attributes();
for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) {
const CatalogAttribute *probe_catalog_attribute
@@ -707,7 +771,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
}
}
- const std::vector<E::AttributeReferencePtr> &right_join_attributes =
+ std::vector<E::AttributeReferencePtr> right_join_attributes =
physical_plan->right_join_attributes();
for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) {
const CatalogAttribute *build_catalog_attribute
@@ -733,6 +797,218 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
key_types.push_back(&left_attribute_type);
}
+ const CatalogRelationInfo *build_relation_info =
+ findRelationInfoOutputByPhysical(build_physical);
+ const CatalogRelationInfo *probe_operator_info =
+ findRelationInfoOutputByPhysical(probe_physical);
+
+ const CatalogRelation *build_relation = build_relation_info->relation;
+ const CatalogRelation *probe_relation = probe_operator_info->relation;
+
+ // FIXME(quickstep-team): Add support for self-join.
+ if (build_relation == probe_relation) {
+ THROW_SQL_ERROR() << "Self-join is not supported";
+ }
+
+ const PartitionScheme *build_partition_scheme = build_relation->getPartitionScheme();
+ const PartitionScheme *probe_partition_scheme = probe_relation->getPartitionScheme();
+
+ bool build_needs_repartition = false;
+ bool probe_needs_repartition = false;
+ bool needs_swap = false;
+ if (build_partition_scheme && probe_partition_scheme) {
+ const PartitionSchemeHeader &build_partition_scheme_header = build_partition_scheme->getPartitionSchemeHeader();
+ const PartitionSchemeHeader &probe_partition_scheme_header = probe_partition_scheme->getPartitionSchemeHeader();
+
+ switch (build_partition_scheme_header.getPartitionType()) {
+ case PartitionSchemeHeader::PartitionType::kRange:
+ build_needs_repartition = true;
+
+ switch (probe_partition_scheme_header.getPartitionType()) {
+ case PartitionSchemeHeader::PartitionType::kRange:
+ probe_needs_repartition = true;
+ break;
+ case PartitionSchemeHeader::PartitionType::kHash: {
+ const attribute_id probe_partition_attr = probe_partition_scheme_header.getPartitionAttributeId();
+ if (find(probe_attribute_ids.begin(), probe_attribute_ids.end(), probe_partition_attr) !=
+ probe_attribute_ids.end()) {
+ needs_swap = true;
+ } else {
+ probe_needs_repartition = true;
+ }
+ break;
+ }
+ }
+ break;
+ case PartitionSchemeHeader::PartitionType::kHash: {
+ const attribute_id build_partition_attr = build_partition_scheme_header.getPartitionAttributeId();
+ if (find(build_attribute_ids.begin(), build_attribute_ids.end(), build_partition_attr) !=
+ build_attribute_ids.end()) {
+ // BuildRelation has an useful partition.
+ switch (probe_partition_scheme_header.getPartitionType()) {
+ case PartitionSchemeHeader::PartitionType::kRange:
+ probe_needs_repartition = true;
+ break;
+ case PartitionSchemeHeader::PartitionType::kHash: {
+ if (areSamePartitionSchemeHeaders(build_partition_scheme_header, *build_relation,
+ probe_partition_scheme_header, *probe_relation)) {
+ if (cost_model_for_hash_join_->estimateCardinality(build_physical) >
+ cost_model_for_hash_join_->estimateCardinality(probe_physical)) {
+ needs_swap = true;
+ }
+ } else {
+ probe_needs_repartition = true;
+ }
+ break;
+ }
+ }
+ } else {
+ build_needs_repartition = true;
+
+ switch (probe_partition_scheme_header.getPartitionType()) {
+ case PartitionSchemeHeader::PartitionType::kRange:
+ probe_needs_repartition = true;
+ break;
+ case PartitionSchemeHeader::PartitionType::kHash: {
+ const attribute_id probe_partition_attr = probe_partition_scheme_header.getPartitionAttributeId();
+ if (find(probe_attribute_ids.begin(), probe_attribute_ids.end(), probe_partition_attr) !=
+ probe_attribute_ids.end()) {
+ needs_swap = true;
+ } else {
+ probe_needs_repartition = true;
+ }
+ break;
+ }
+ }
+ }
+ break;
+ }
+ }
+ } else if (probe_partition_scheme) {
+ needs_swap = true;
+
+ const PartitionSchemeHeader &probe_partition_scheme_header = probe_partition_scheme->getPartitionSchemeHeader();
+ switch (probe_partition_scheme_header.getPartitionType()) {
+ case PartitionSchemeHeader::PartitionType::kRange:
+ probe_needs_repartition = true;
+ break;
+ case PartitionSchemeHeader::PartitionType::kHash: {
+ const attribute_id probe_partition_attr = probe_partition_scheme_header.getPartitionAttributeId();
+
+ probe_needs_repartition =
+ (find(probe_attribute_ids.begin(), probe_attribute_ids.end(), probe_partition_attr) ==
+ probe_attribute_ids.end());
+ break;
+ }
+ }
+ } else if (build_partition_scheme) {
+ const PartitionSchemeHeader &build_partition_scheme_header = build_partition_scheme->getPartitionSchemeHeader();
+ switch (build_partition_scheme_header.getPartitionType()) {
+ case PartitionSchemeHeader::PartitionType::kRange:
+ build_needs_repartition = true;
+ break;
+ case PartitionSchemeHeader::PartitionType::kHash: {
+ const attribute_id build_partition_attr = build_partition_scheme_header.getPartitionAttributeId();
+ build_needs_repartition =
+ (find(build_attribute_ids.begin(), build_attribute_ids.end(), build_partition_attr) ==
+ build_attribute_ids.end());
+ break;
+ }
+ }
+ }
+
+ if (needs_swap) {
+ swap(probe_physical, build_physical);
+ swap(probe_attribute_ids, build_attribute_ids);
+ swap(any_probe_attributes_nullable, any_build_attributes_nullable);
+ swap(left_join_attributes, right_join_attributes);
+ swap(probe_operator_info, build_relation_info);
+ swap(probe_relation, build_relation);
+ swap(probe_partition_scheme, build_partition_scheme);
+ swap(probe_needs_repartition, build_needs_repartition);
+ }
+
+ if ((build_needs_repartition && build_relation_info->isStoredRelation()) ||
+ (probe_needs_repartition && probe_operator_info->isStoredRelation())) {
+ THROW_SQL_ERROR() << "Re-partition for the base table is not supported";
+ }
+
+ if (!build_needs_repartition && probe_needs_repartition) {
+ const PartitionSchemeHeader &build_partition_scheme_header = build_partition_scheme->getPartitionSchemeHeader();
+ const attribute_id build_partition_attr = build_partition_scheme_header.getPartitionAttributeId();
+
+ size_t repartition_attr_index = 0;
+ while (build_attribute_ids[repartition_attr_index] != build_partition_attr) {
+ ++repartition_attr_index;
+ }
+ auto probe_repartition_scheme_header =
+ make_unique<HashPartitionSchemeHeader>(build_partition_scheme_header.getNumPartitions(),
+ probe_attribute_ids[repartition_attr_index]);
+ auto probe_repartition_scheme = make_unique<PartitionScheme>(probe_repartition_scheme_header.release());
+
+ CatalogRelation *mutable_probe_relation =
+ catalog_database_->getRelationByIdMutable(probe_relation->getID());
+ mutable_probe_relation->setPartitionScheme(probe_repartition_scheme.release());
+
+ probe_partition_scheme = probe_relation->getPartitionScheme();
+ DCHECK_EQ(PartitionSchemeHeader::PartitionType::kHash,
+ probe_partition_scheme->getPartitionSchemeHeader().getPartitionType());
+
+ S::InsertDestination *probe_insert_destination_proto =
+ query_context_proto_->mutable_insert_destinations(probe_operator_info->output_destination_index);
+ probe_insert_destination_proto->Clear();
+
+ probe_insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+ probe_insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+ ->MergeFrom(probe_partition_scheme->getProto());
+ } else if (build_needs_repartition) {
+ const size_t repartition_attr_index =
+ chooseBestRepartitionAttributeIndex(build_relation->getStatistics(), build_attribute_ids);
+ auto build_repartition_scheme_header =
+ make_unique<HashPartitionSchemeHeader>(FLAGS_num_repartitions,
+ build_attribute_ids[repartition_attr_index]);
+ auto build_repartition_scheme = make_unique<PartitionScheme>(build_repartition_scheme_header.release());
+
+ CatalogRelation *mutable_build_relation =
+ catalog_database_->getRelationByIdMutable(build_relation->getID());
+ mutable_build_relation->setPartitionScheme(build_repartition_scheme.release());
+
+ build_partition_scheme = build_relation->getPartitionScheme();
+ DCHECK_EQ(PartitionSchemeHeader::PartitionType::kHash,
+ build_partition_scheme->getPartitionSchemeHeader().getPartitionType());
+
+ S::InsertDestination *build_insert_destination_proto =
+ query_context_proto_->mutable_insert_destinations(build_relation_info->output_destination_index);
+ build_insert_destination_proto->Clear();
+
+ build_insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+ build_insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+ ->MergeFrom(build_partition_scheme->getProto());
+
+ if (probe_needs_repartition) {
+ auto probe_repartition_scheme_header =
+ make_unique<HashPartitionSchemeHeader>(FLAGS_num_repartitions,
+ probe_attribute_ids[repartition_attr_index]);
+ auto probe_repartition_scheme = make_unique<PartitionScheme>(probe_repartition_scheme_header.release());
+
+ CatalogRelation *mutable_probe_relation =
+ catalog_database_->getRelationByIdMutable(probe_relation->getID());
+ mutable_probe_relation->setPartitionScheme(probe_repartition_scheme.release());
+
+ probe_partition_scheme = probe_relation->getPartitionScheme();
+ DCHECK_EQ(PartitionSchemeHeader::PartitionType::kHash,
+ probe_partition_scheme->getPartitionSchemeHeader().getPartitionType());
+
+ S::InsertDestination *probe_insert_destination_proto =
+ query_context_proto_->mutable_insert_destinations(probe_operator_info->output_destination_index);
+ probe_insert_destination_proto->Clear();
+
+ probe_insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+ probe_insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+ ->MergeFrom(probe_partition_scheme->getProto());
+ }
+ }
+
// Convert the residual predicate proto.
QueryContext::predicate_id residual_predicate_index = QueryContext::kInvalidPredicateId;
if (physical_plan->residual_predicate()) {
@@ -748,11 +1024,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
convertNamedExpressions(physical_plan->project_expressions(),
query_context_proto_->add_scalar_groups());
- const CatalogRelationInfo *build_relation_info =
- findRelationInfoOutputByPhysical(build_physical);
- const CatalogRelationInfo *probe_operator_info =
- findRelationInfoOutputByPhysical(probe_physical);
-
// Create a vector that indicates whether each project expression is using
// attributes from the build relation as input. This information is required
// by the current implementation of hash left outer join
@@ -765,30 +1036,17 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
build_physical->getOutputAttributes())));
}
- const CatalogRelation *build_relation = build_relation_info->relation;
-
- // FIXME(quickstep-team): Add support for self-join.
- if (build_relation == probe_operator_info->relation) {
- THROW_SQL_ERROR() << "Self-join is not supported";
- }
-
// Create join hash table proto.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto_->join_hash_tables_size();
S::QueryContext::HashTableContext *hash_table_context_proto =
query_context_proto_->add_join_hash_tables();
- // No partition.
- std::size_t num_partitions = 1;
- if (build_relation->hasPartitionScheme() &&
- build_attribute_ids.size() == 1) {
- const PartitionSchemeHeader &partition_scheme_header =
- build_relation->getPartitionScheme()->getPartitionSchemeHeader();
- if (build_attribute_ids[0] == partition_scheme_header.getPartitionAttributeId()) {
- // TODO(zuyu): add optimizer support for partitioned hash joins.
- hash_table_context_proto->set_num_partitions(num_partitions);
- }
- }
+ const std::size_t build_num_partitions =
+ build_partition_scheme
+ ? build_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
+ : 1u;
+ hash_table_context_proto->set_num_partitions(build_num_partitions);
S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table();
@@ -805,6 +1063,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
build_relation->getAttributeById(build_attribute)->getType().getProto());
}
+ const std::size_t build_cardinality =
+ cost_model_for_hash_join_->estimateCardinality(build_physical);
hash_table_proto->set_estimated_num_entries(build_cardinality);
// Create three operators.
@@ -816,7 +1076,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
build_relation_info->isStoredRelation(),
build_attribute_ids,
any_build_attributes_nullable,
- num_partitions,
+ build_num_partitions,
join_hash_table_index));
// Create InsertDestination proto.
@@ -855,11 +1115,11 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
new HashJoinOperator(
query_handle_->query_id(),
*build_relation,
- *probe_operator_info->relation,
+ *probe_relation,
probe_operator_info->isStoredRelation(),
probe_attribute_ids,
any_probe_attributes_nullable,
- num_partitions,
+ build_num_partitions,
*output_relation,
insert_destination_index,
join_hash_table_index,
@@ -871,7 +1131,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
const QueryPlan::DAGNodeIndex destroy_operator_index =
execution_plan_->addRelationalOperator(new DestroyHashOperator(
- query_handle_->query_id(), num_partitions, join_hash_table_index));
+ query_handle_->query_id(), build_num_partitions, join_hash_table_index));
if (!build_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(build_operator_index,
@@ -902,8 +1162,9 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
std::piecewise_construct,
std::forward_as_tuple(physical_plan),
std::forward_as_tuple(join_operator_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
+ output_relation,
+ insert_destination_index));
+ temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation, insert_destination_index);
if (lip_filter_generator_ != nullptr) {
lip_filter_generator_->addHashJoinInfo(physical_plan,
@@ -979,8 +1240,9 @@ void ExecutionGenerator::convertNestedLoopsJoin(
std::piecewise_construct,
std::forward_as_tuple(physical_plan),
std::forward_as_tuple(join_operator_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
+ output_relation,
+ insert_destination_index));
+ temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation, insert_destination_index);
}
void ExecutionGenerator::convertCopyFrom(
@@ -1578,9 +1840,12 @@ void ExecutionGenerator::convertAggregate(
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_plan),
- std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
+ std::forward_as_tuple(finalize_aggregation_operator_index,
+ output_relation,
+ insert_destination_index));
temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
- output_relation);
+ output_relation,
+ insert_destination_index);
const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
execution_plan_->addRelationalOperator(
@@ -1641,7 +1906,8 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
false /* is_pipeline_breaker */);
}
temporary_relation_info_vec_.emplace_back(run_generator_index,
- initial_runs_relation);
+ initial_runs_relation,
+ initial_runs_destination_id);
initial_runs_destination_proto->set_relational_op_index(run_generator_index);
// Create sort configuration for run merging.
@@ -1716,12 +1982,14 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
true /* is_pipeline_breaker */);
temporary_relation_info_vec_.emplace_back(merge_run_operator_index,
- sorted_relation);
+ sorted_relation,
+ sorted_output_destination_id);
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_sort),
std::forward_as_tuple(merge_run_operator_index,
- sorted_relation));
+ sorted_relation,
+ sorted_output_destination_id));
}
void ExecutionGenerator::convertTableGenerator(
@@ -1756,8 +2024,9 @@ void ExecutionGenerator::convertTableGenerator(
std::piecewise_construct,
std::forward_as_tuple(physical_tablegen),
std::forward_as_tuple(tablegen_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation);
+ output_relation,
+ insert_destination_index));
+ temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation, insert_destination_index);
}
void ExecutionGenerator::convertWindowAggregate(
@@ -1860,9 +2129,12 @@ void ExecutionGenerator::convertWindowAggregate(
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_plan),
- std::forward_as_tuple(window_aggregation_operator_index, output_relation));
+ std::forward_as_tuple(window_aggregation_operator_index,
+ output_relation,
+ insert_destination_index));
temporary_relation_info_vec_.emplace_back(window_aggregation_operator_index,
- output_relation);
+ output_relation,
+ insert_destination_index);
}
} // namespace optimizer
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index eba6eee..8941141 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -132,9 +132,11 @@ class ExecutionGenerator {
*/
struct CatalogRelationInfo {
CatalogRelationInfo(const QueryPlan::DAGNodeIndex producer_operator_index_in,
- const CatalogRelation *relation_in)
+ const CatalogRelation *relation_in,
+ const QueryContext::insert_destination_id output_destination_index_in)
: producer_operator_index(producer_operator_index_in),
- relation(relation_in) {}
+ relation(relation_in),
+ output_destination_index(output_destination_index_in) {}
/**
* @return True if the relation is a stored relation (i.e. not a temporary relation
@@ -146,6 +148,7 @@ class ExecutionGenerator {
const QueryPlan::DAGNodeIndex producer_operator_index;
const CatalogRelation *relation;
+ const QueryContext::insert_destination_id output_destination_index;
/**
* @brief Represents an invalid node index.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/query_optimizer/tests/execution_generator/Partition.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Partition.test b/query_optimizer/tests/execution_generator/Partition.test
index ab05391..3bee78b 100644
--- a/query_optimizer/tests/execution_generator/Partition.test
+++ b/query_optimizer/tests/execution_generator/Partition.test
@@ -15,19 +15,35 @@
# specific language governing permissions and limitations
# under the License.
-CREATE TABLE foo (id INT NULL,
- name CHAR(20))
+CREATE TABLE dim (id INT NULL,
+ char_col CHAR(20))
+PARTITION BY HASH(id) PARTITIONS 4;
+CREATE TABLE dim2 (id INT NULL,
+ char_col CHAR(20))
+PARTITION BY HASH(id) PARTITIONS 2;
+CREATE TABLE fact (id INT NULL,
+ score DOUBLE NULL)
PARTITION BY HASH(id) PARTITIONS 4;
-INSERT INTO foo
+INSERT INTO dim
SELECT int_col, char_col
FROM test
WHERE int_col > 0 OR int_col < 0;
-SELECT * FROM foo;
+INSERT INTO dim2
+SELECT int_col, char_col
+FROM test
+WHERE int_col > 0 OR int_col < 0;
+
+INSERT INTO fact
+SELECT int_col, double_col
+FROM test
+WHERE int_col % 2 = 0;
+
+SELECT * FROM dim;
--
+-----------+--------------------+
-|id |name |
+|id |char_col |
+-----------+--------------------+
| 4| 4 2.000000|
| 8| 8 2.828427|
@@ -52,3 +68,48 @@ SELECT * FROM foo;
| -17| -17 4.123106|
| -21| -21 4.582576|
+-----------+--------------------+
+==
+
+# Partitioned Hash Join.
+SELECT fact.id, dim.char_col
+FROM dim JOIN fact ON dim.id = fact.id;
+--
++-----------+--------------------+
+|id |char_col |
++-----------+--------------------+
+| 4| 4 2.000000|
+| 8| 8 2.828427|
+| 12| 12 3.464102|
+| 16| 16 4.000000|
+| 24| 24 4.898979|
+| 2| 2 1.414214|
+| 6| 6 2.449490|
+| 14| 14 3.741657|
+| 18| 18 4.242641|
+| 22| 22 4.690416|
++-----------+--------------------+
+==
+
+# Hash Join with two stored relations, one of which is partitioned.
+SELECT fact.id, test.char_col
+FROM test JOIN fact ON test.int_col = fact.id;
+--
+[same as above]
+==
+
+# Hash Join with one stored, partitioned relation,
+# and a non-stored, non-partitioned one.
+SELECT fact.id, test.char_col
+FROM fact JOIN test ON fact.id = test.int_col
+WHERE test.int_col % 2 = 0;
+--
+[same as above]
+==
+
+# Repartitioned Hash Join.
+SELECT fact.id, dim2.char_col
+FROM dim2, fact
+WHERE dim2.id = fact.id
+ AND dim2.id % 2 = 0;
+--
+[same as above]
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/types/TypedValue.hpp
----------------------------------------------------------------------
diff --git a/types/TypedValue.hpp b/types/TypedValue.hpp
index 0ba3d53..1b564c5 100644
--- a/types/TypedValue.hpp
+++ b/types/TypedValue.hpp
@@ -253,6 +253,25 @@ class TypedValue {
}
/**
+ * @brief Equal operator.
+ **/
+ bool operator==(const TypedValue &rhs) const {
+ if (getTypeID() != rhs.getTypeID()) {
+ return false;
+ }
+
+ if (isNull() != rhs.isNull()) {
+ return false;
+ }
+
+ if (isNull()) {
+ return true;
+ }
+
+ return fastEqualCheck(rhs);
+ }
+
+ /**
* @brief Create a new literal TypedValue with pre-allocated out-of-line
* data.
* @warning The memory at value_ptr must be allocated with malloc() or