You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sp...@apache.org on 2016/12/11 17:45:37 UTC
[14/51] [abbrv] [partial] incubator-quickstep git commit: remove c++
files
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
deleted file mode 100644
index 2e0d8f3..0000000
--- a/query_optimizer/ExecutionGenerator.cpp
+++ /dev/null
@@ -1,1725 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_optimizer/ExecutionGenerator.hpp"
-
-#include <algorithm>
-#include <cstddef>
-#include <memory>
-#include <string>
-#include <type_traits>
-#include <unordered_map>
-
-#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED.
-
-#ifdef QUICKSTEP_DISTRIBUTED
-#include <unordered_set>
-#endif
-
-#include <utility>
-#include <vector>
-
-#ifdef QUICKSTEP_DISTRIBUTED
-#include "catalog/Catalog.pb.h"
-#endif
-
-#include "catalog/CatalogAttribute.hpp"
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogRelationSchema.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/Expressions.pb.h"
-#include "expressions/aggregation/AggregateFunction.hpp"
-#include "expressions/aggregation/AggregateFunction.pb.h"
-#include "expressions/predicate/Predicate.hpp"
-#include "expressions/scalar/Scalar.hpp"
-#include "expressions/scalar/ScalarAttribute.hpp"
-#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
-#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/LIPFilterGenerator.hpp"
-#include "query_optimizer/OptimizerContext.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "query_optimizer/QueryPlan.hpp"
-#include "query_optimizer/cost_model/SimpleCostModel.hpp"
-#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
-#include "query_optimizer/expressions/AggregateFunction.hpp"
-#include "query_optimizer/expressions/Alias.hpp"
-#include "query_optimizer/expressions/AttributeReference.hpp"
-#include "query_optimizer/expressions/ComparisonExpression.hpp"
-#include "query_optimizer/expressions/ExpressionType.hpp"
-#include "query_optimizer/expressions/PatternMatcher.hpp"
-#include "query_optimizer/expressions/Scalar.hpp"
-#include "query_optimizer/expressions/ScalarLiteral.hpp"
-#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
-#include "query_optimizer/physical/CopyFrom.hpp"
-#include "query_optimizer/physical/CreateIndex.hpp"
-#include "query_optimizer/physical/CreateTable.hpp"
-#include "query_optimizer/physical/DeleteTuples.hpp"
-#include "query_optimizer/physical/DropTable.hpp"
-#include "query_optimizer/physical/HashJoin.hpp"
-#include "query_optimizer/physical/InsertSelection.hpp"
-#include "query_optimizer/physical/InsertTuple.hpp"
-#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
-#include "query_optimizer/physical/NestedLoopsJoin.hpp"
-#include "query_optimizer/physical/PatternMatcher.hpp"
-#include "query_optimizer/physical/Physical.hpp"
-#include "query_optimizer/physical/PhysicalType.hpp"
-#include "query_optimizer/physical/Sample.hpp"
-#include "query_optimizer/physical/Selection.hpp"
-#include "query_optimizer/physical/SharedSubplanReference.hpp"
-#include "query_optimizer/physical/Sort.hpp"
-#include "query_optimizer/physical/TableGenerator.hpp"
-#include "query_optimizer/physical/TableReference.hpp"
-#include "query_optimizer/physical/TopLevelPlan.hpp"
-#include "query_optimizer/physical/UpdateTable.hpp"
-#include "query_optimizer/physical/WindowAggregate.hpp"
-#include "relational_operators/AggregationOperator.hpp"
-#include "relational_operators/BuildHashOperator.hpp"
-#include "relational_operators/CreateIndexOperator.hpp"
-#include "relational_operators/CreateTableOperator.hpp"
-#include "relational_operators/DeleteOperator.hpp"
-#include "relational_operators/DestroyAggregationStateOperator.hpp"
-#include "relational_operators/DestroyHashOperator.hpp"
-#include "relational_operators/DropTableOperator.hpp"
-#include "relational_operators/FinalizeAggregationOperator.hpp"
-#include "relational_operators/HashJoinOperator.hpp"
-#include "relational_operators/InsertOperator.hpp"
-#include "relational_operators/NestedLoopsJoinOperator.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/SampleOperator.hpp"
-#include "relational_operators/SaveBlocksOperator.hpp"
-#include "relational_operators/SelectOperator.hpp"
-#include "relational_operators/SortMergeRunOperator.hpp"
-#include "relational_operators/SortRunGenerationOperator.hpp"
-#include "relational_operators/TableGeneratorOperator.hpp"
-#include "relational_operators/TextScanOperator.hpp"
-#include "relational_operators/UpdateOperator.hpp"
-#include "relational_operators/WindowAggregationOperator.hpp"
-#include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.pb.h"
-#include "storage/HashTableFactory.hpp"
-#include "storage/InsertDestination.pb.h"
-#include "storage/StorageBlockLayout.hpp"
-#include "storage/StorageBlockLayout.pb.h"
-#include "storage/SubBlockTypeRegistry.hpp"
-#include "types/Type.hpp"
-#include "types/Type.pb.h"
-#include "types/TypedValue.hpp"
-#include "types/TypedValue.pb.h"
-#include "types/containers/Tuple.pb.h"
-#include "utility/SqlError.hpp"
-
-#include "gflags/gflags.h"
-#include "glog/logging.h"
-
-using std::move;
-using std::static_pointer_cast;
-using std::unique_ptr;
-using std::unordered_map;
-using std::vector;
-
-namespace quickstep {
-namespace optimizer {
-
-DEFINE_string(join_hashtable_type, "SeparateChaining",
- "HashTable implementation to use for hash joins (valid options "
- "are SeparateChaining or LinearOpenAddressing)");
-static const volatile bool join_hashtable_type_dummy
- = gflags::RegisterFlagValidator(&FLAGS_join_hashtable_type,
- &ValidateHashTableImplTypeString);
-
-DEFINE_string(aggregate_hashtable_type, "SeparateChaining",
- "HashTable implementation to use for aggregates with GROUP BY "
- "(valid options are SeparateChaining or LinearOpenAddressing)");
-static const volatile bool aggregate_hashtable_type_dummy
- = gflags::RegisterFlagValidator(&FLAGS_aggregate_hashtable_type,
- &ValidateHashTableImplTypeString);
-
-DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
-
-namespace E = ::quickstep::optimizer::expressions;
-namespace P = ::quickstep::optimizer::physical;
-namespace S = ::quickstep::serialization;
-
-constexpr QueryPlan::DAGNodeIndex ExecutionGenerator::CatalogRelationInfo::kInvalidOperatorIndex;
-
-void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
- CHECK(P::SomeTopLevelPlan::MatchesWithConditionalCast(physical_plan, &top_level_physical_plan_))
- << "The physical plan must be rooted by a TopLevelPlan";
-
- cost_model_for_aggregation_.reset(
- new cost::StarSchemaSimpleCostModel(top_level_physical_plan_->shared_subplans()));
- cost_model_for_hash_join_.reset(
- new cost::SimpleCostModel(top_level_physical_plan_->shared_subplans()));
-
- const auto &lip_filter_configuration =
- top_level_physical_plan_->lip_filter_configuration();
- if (lip_filter_configuration != nullptr) {
- lip_filter_generator_.reset(new LIPFilterGenerator(lip_filter_configuration));
- }
-
- const CatalogRelation *result_relation = nullptr;
-
- try {
- for (const P::PhysicalPtr &shared_subplan : top_level_physical_plan_->shared_subplans()) {
- generatePlanInternal(shared_subplan);
- }
- generatePlanInternal(top_level_physical_plan_->plan());
-
- // Deploy LIPFilters if enabled.
- if (lip_filter_generator_ != nullptr) {
- lip_filter_generator_->deployLIPFilters(execution_plan_, query_context_proto_);
- }
-
- // Set the query result relation if the input plan exists in physical_to_execution_map_,
- // which indicates the plan is the result of a SELECT query.
- const std::unordered_map<P::PhysicalPtr, CatalogRelationInfo>::const_iterator it =
- physical_to_output_relation_map_.find(top_level_physical_plan_->plan());
- if (it != physical_to_output_relation_map_.end()) {
- result_relation = it->second.relation;
- }
- } catch (...) {
- // Drop all temporary relations.
- dropAllTemporaryRelations();
- throw;
- }
-
- // Add one DropTableOperator per temporary relation, except for the result relation, if any.
- // NOTE(zuyu): the Cli shell drops the result relation after printing, if enabled.
- for (const CatalogRelationInfo &temporary_relation_info : temporary_relation_info_vec_) {
- const CatalogRelation *temporary_relation = temporary_relation_info.relation;
- if (temporary_relation == result_relation) {
- query_handle_->setQueryResultRelation(
- catalog_database_->getRelationByIdMutable(result_relation->getID()));
- continue;
- }
- const QueryPlan::DAGNodeIndex drop_table_index =
- execution_plan_->addRelationalOperator(
- new DropTableOperator(query_handle_->query_id(),
- *temporary_relation,
- catalog_database_,
- false /* only_drop_blocks */));
- DCHECK(!temporary_relation_info.isStoredRelation());
- execution_plan_->addDependenciesForDropOperator(
- drop_table_index,
- temporary_relation_info.producer_operator_index);
- }
-
-#ifdef QUICKSTEP_DISTRIBUTED
- catalog_database_cache_proto_->set_name(catalog_database_->getName());
-
- LOG(INFO) << "CatalogDatabaseCache proto has " << referenced_relation_ids_.size() << " relation(s)";
- for (const relation_id rel_id : referenced_relation_ids_) {
- const CatalogRelationSchema &relation =
- catalog_database_->getRelationSchemaById(rel_id);
- LOG(INFO) << "RelationSchema " << rel_id
- << ", name: " << relation.getName()
- << ", " << relation.size() << " attribute(s)";
- catalog_database_cache_proto_->add_relations()->MergeFrom(relation.getProto());
- }
-#endif
-}
-
-void ExecutionGenerator::generatePlanInternal(
- const P::PhysicalPtr &physical_plan) {
- // Generate the execution plan in bottom-up.
- for (const P::PhysicalPtr &child : physical_plan->children()) {
- generatePlanInternal(child);
- }
-
- // If enabled, collect attribute substitution map for LIPFilterGenerator.
- if (lip_filter_generator_ != nullptr) {
- lip_filter_generator_->registerAttributeMap(physical_plan, attribute_substitution_map_);
- }
-
- switch (physical_plan->getPhysicalType()) {
- case P::PhysicalType::kAggregate:
- return convertAggregate(
- std::static_pointer_cast<const P::Aggregate>(physical_plan));
- case P::PhysicalType::kCopyFrom:
- return convertCopyFrom(
- std::static_pointer_cast<const P::CopyFrom>(physical_plan));
- case P::PhysicalType::kCreateIndex:
- return convertCreateIndex(
- std::static_pointer_cast<const P::CreateIndex>(physical_plan));
- case P::PhysicalType::kCreateTable:
- return convertCreateTable(
- std::static_pointer_cast<const P::CreateTable>(physical_plan));
- case P::PhysicalType::kDeleteTuples:
- return convertDeleteTuples(
- std::static_pointer_cast<const P::DeleteTuples>(physical_plan));
- case P::PhysicalType::kDropTable:
- return convertDropTable(
- std::static_pointer_cast<const P::DropTable>(physical_plan));
- case P::PhysicalType::kHashJoin:
- return convertHashJoin(
- std::static_pointer_cast<const P::HashJoin>(physical_plan));
- case P::PhysicalType::kInsertSelection:
- return convertInsertSelection(
- std::static_pointer_cast<const P::InsertSelection>(physical_plan));
- case P::PhysicalType::kInsertTuple:
- return convertInsertTuple(
- std::static_pointer_cast<const P::InsertTuple>(physical_plan));
- case P::PhysicalType::kNestedLoopsJoin:
- return convertNestedLoopsJoin(
- std::static_pointer_cast<const P::NestedLoopsJoin>(physical_plan));
- case P::PhysicalType::kSample:
- return convertSample(
- std::static_pointer_cast<const P::Sample>(physical_plan));
- case P::PhysicalType::kSelection:
- return convertSelection(
- std::static_pointer_cast<const P::Selection>(physical_plan));
- case P::PhysicalType::kSharedSubplanReference:
- return convertSharedSubplanReference(
- std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan));
- case P::PhysicalType::kSort:
- return convertSort(
- std::static_pointer_cast<const P::Sort>(physical_plan));
- case P::PhysicalType::kTableGenerator:
- return convertTableGenerator(
- std::static_pointer_cast<const P::TableGenerator>(physical_plan));
- case P::PhysicalType::kTableReference:
- return convertTableReference(
- std::static_pointer_cast<const P::TableReference>(physical_plan));
- case P::PhysicalType::kUpdateTable:
- return convertUpdateTable(
- std::static_pointer_cast<const P::UpdateTable>(physical_plan));
- case P::PhysicalType::kWindowAggregate:
- return convertWindowAggregate(
- std::static_pointer_cast<const P::WindowAggregate>(physical_plan));
- default:
- LOG(FATAL) << "Unknown physical plan node "
- << physical_plan->getShortString();
- }
-}
-
-std::string ExecutionGenerator::getNewRelationName() {
- std::ostringstream out;
- out << OptimizerContext::kInternalTemporaryRelationNamePrefix
- << query_handle_->query_id() << "_" << rel_id_;
- ++rel_id_;
- return out.str();
-}
-
-void ExecutionGenerator::createTemporaryCatalogRelation(
- const P::PhysicalPtr &physical,
- const CatalogRelation **catalog_relation_output,
- S::InsertDestination *insert_destination_proto) {
- std::unique_ptr<CatalogRelation> catalog_relation(
- new CatalogRelation(catalog_database_,
- getNewRelationName(),
- -1 /* id */,
- true /* is_temporary*/));
- attribute_id aid = 0;
- for (const E::NamedExpressionPtr &project_expression :
- physical->getOutputAttributes()) {
- // The attribute name is simply set to the attribute id to make it distinct.
- std::unique_ptr<CatalogAttribute> catalog_attribute(
- new CatalogAttribute(catalog_relation.get(),
- std::to_string(aid),
- project_expression->getValueType(),
- aid,
- project_expression->attribute_alias()));
- attribute_substitution_map_[project_expression->id()] =
- catalog_attribute.get();
- catalog_relation->addAttribute(catalog_attribute.release());
- ++aid;
- }
-
- *catalog_relation_output = catalog_relation.get();
- const relation_id output_rel_id = catalog_database_->addRelation(
- catalog_relation.release());
-
-#ifdef QUICKSTEP_DISTRIBUTED
- referenced_relation_ids_.insert(output_rel_id);
-#endif
-
- insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
- insert_destination_proto->set_relation_id(output_rel_id);
-}
-
-void ExecutionGenerator::dropAllTemporaryRelations() {
- for (const CatalogRelationInfo &temporary_relation_info :
- temporary_relation_info_vec_) {
- DCHECK_EQ(temporary_relation_info.relation->size_blocks(), 0u);
- catalog_database_->dropRelationById(temporary_relation_info.relation->getID());
- }
-}
-
-void ExecutionGenerator::convertNamedExpressions(
- const std::vector<E::NamedExpressionPtr> &named_expressions,
- S::QueryContext::ScalarGroup *scalar_group_proto) {
- for (const E::NamedExpressionPtr &project_expression : named_expressions) {
- unique_ptr<const Scalar> execution_scalar;
- E::AliasPtr alias;
- if (E::SomeAlias::MatchesWithConditionalCast(project_expression, &alias)) {
- E::ScalarPtr scalar;
- // We have not added aggregate expressions yet,
- // so all child expressions of an Alias should be a Scalar.
- CHECK(E::SomeScalar::MatchesWithConditionalCast(alias->expression(), &scalar))
- << alias->toString();
- execution_scalar.reset(scalar->concretize(attribute_substitution_map_));
- } else {
- execution_scalar.reset(project_expression->concretize(attribute_substitution_map_));
- }
-
- scalar_group_proto->add_scalars()->CopyFrom(execution_scalar->getProto());
- }
-}
-
-Predicate* ExecutionGenerator::convertPredicate(
- const expressions::PredicatePtr &optimizer_predicate) const {
- return optimizer_predicate->concretize(attribute_substitution_map_);
-}
-
-void ExecutionGenerator::convertTableReference(
- const P::TableReferencePtr &physical_table_reference) {
- // TableReference is not converted to an execution operator;
- // instead it just provides CatalogRelation info for its
- // parent (e.g. the substitution map from an AttributeReference
- // to a CatalogAttribute).
- const CatalogRelation *catalog_relation = physical_table_reference->relation();
-
-#ifdef QUICKSTEP_DISTRIBUTED
- referenced_relation_ids_.insert(catalog_relation->getID());
-#endif
-
- const std::vector<E::AttributeReferencePtr> &attribute_references =
- physical_table_reference->attribute_list();
- DCHECK_EQ(attribute_references.size(), catalog_relation->size());
-
- for (CatalogRelation::size_type i = 0; i < catalog_relation->size(); ++i) {
- attribute_substitution_map_.emplace(attribute_references[i]->id(),
- catalog_relation->getAttributeById(i));
- }
- physical_to_output_relation_map_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(physical_table_reference),
- std::forward_as_tuple(CatalogRelationInfo::kInvalidOperatorIndex,
- catalog_relation));
-}
-
-void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) {
- // Create InsertDestination proto.
- const CatalogRelation *output_relation = nullptr;
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto =
- query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(physical_sample,
- &output_relation,
- insert_destination_proto);
-
- // Create and add a Sample operator.
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_sample->input());
- DCHECK(input_relation_info != nullptr);
-
- SampleOperator *sample_op =
- new SampleOperator(query_handle_->query_id(),
- *input_relation_info->relation,
- *output_relation,
- insert_destination_index,
- input_relation_info->isStoredRelation(),
- physical_sample->is_block_sample(),
- physical_sample->percentage());
- const QueryPlan::DAGNodeIndex sample_index =
- execution_plan_->addRelationalOperator(sample_op);
- insert_destination_proto->set_relational_op_index(sample_index);
-
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(sample_index,
- input_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
- physical_to_output_relation_map_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(physical_sample),
- std::forward_as_tuple(sample_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(sample_index, output_relation);
-}
-
-bool ExecutionGenerator::convertSimpleProjection(
- const QueryContext::scalar_group_id project_expressions_group_index,
- std::vector<attribute_id> *attributes) const {
- const S::QueryContext::ScalarGroup &scalar_group_proto =
- query_context_proto_->scalar_groups(project_expressions_group_index);
-
- for (int i = 0; i < scalar_group_proto.scalars_size(); ++i) {
- if (scalar_group_proto.scalars(i).data_source() != S::Scalar::ATTRIBUTE) {
- return false;
- }
- }
-
- for (int i = 0; i < scalar_group_proto.scalars_size(); ++i) {
- attributes->push_back(
- scalar_group_proto.scalars(i).GetExtension(S::ScalarAttribute::attribute_id));
- }
-
- return true;
-}
-
-void ExecutionGenerator::convertSelection(
- const P::SelectionPtr &physical_selection) {
- // Check if the Selection is only for renaming columns.
- if (physical_selection->filter_predicate() == nullptr) {
- const std::vector<E::AttributeReferencePtr> input_attributes =
- physical_selection->input()->getOutputAttributes();
- const std::vector<E::NamedExpressionPtr> &project_expressions =
- physical_selection->project_expressions();
- if (project_expressions.size() == input_attributes.size()) {
- bool has_different_attrs = false;
- for (std::size_t attr_idx = 0; attr_idx < input_attributes.size(); ++attr_idx) {
- if (project_expressions[attr_idx]->id() != input_attributes[attr_idx]->id()) {
- has_different_attrs = true;
- break;
- }
- }
- if (!has_different_attrs) {
- const std::unordered_map<P::PhysicalPtr, CatalogRelationInfo>::const_iterator input_catalog_rel_it =
- physical_to_output_relation_map_.find(physical_selection->input());
- DCHECK(input_catalog_rel_it != physical_to_output_relation_map_.end());
- if (!input_catalog_rel_it->second.isStoredRelation()) {
- CatalogRelation *catalog_relation =
- const_cast<CatalogRelation*>(input_catalog_rel_it->second.relation);
- for (std::size_t attr_idx = 0; attr_idx < project_expressions.size(); ++attr_idx) {
- CatalogAttribute *catalog_attribute =
- catalog_relation->getAttributeByIdMutable(attr_idx);
- DCHECK(catalog_attribute != nullptr);
- catalog_attribute->setDisplayName(
- project_expressions[attr_idx]->attribute_alias());
- }
- physical_to_output_relation_map_.emplace(physical_selection,
- input_catalog_rel_it->second);
- return;
- }
- }
- }
- }
-
- // Convert the project expressions proto.
- const QueryContext::scalar_group_id project_expressions_group_index =
- query_context_proto_->scalar_groups_size();
- convertNamedExpressions(physical_selection->project_expressions(),
- query_context_proto_->add_scalar_groups());
-
- // Convert the predicate proto.
- QueryContext::predicate_id execution_predicate_index = QueryContext::kInvalidPredicateId;
- if (physical_selection->filter_predicate()) {
- execution_predicate_index = query_context_proto_->predicates_size();
-
- unique_ptr<const Predicate> execution_predicate(convertPredicate(physical_selection->filter_predicate()));
- query_context_proto_->add_predicates()->CopyFrom(execution_predicate->getProto());
- }
-
- // Create InsertDestination proto.
- const CatalogRelation *output_relation = nullptr;
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(physical_selection,
- &output_relation,
- insert_destination_proto);
-
- // Create and add a Select operator.
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_selection->input());
- DCHECK(input_relation_info != nullptr);
-
- // Use the "simple" form of the selection operator (a pure projection that
- // doesn't require any expression evaluation or intermediate copies) if
- // possible.
- std::vector<attribute_id> attributes;
- SelectOperator *op =
- convertSimpleProjection(project_expressions_group_index, &attributes)
- ? new SelectOperator(query_handle_->query_id(),
- *input_relation_info->relation,
- *output_relation,
- insert_destination_index,
- execution_predicate_index,
- move(attributes),
- input_relation_info->isStoredRelation())
- : new SelectOperator(query_handle_->query_id(),
- *input_relation_info->relation,
- *output_relation,
- insert_destination_index,
- execution_predicate_index,
- project_expressions_group_index,
- input_relation_info->isStoredRelation());
-
- const QueryPlan::DAGNodeIndex select_index =
- execution_plan_->addRelationalOperator(op);
- insert_destination_proto->set_relational_op_index(select_index);
-
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(select_index,
- input_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
- physical_to_output_relation_map_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(physical_selection),
- std::forward_as_tuple(select_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(select_index, output_relation);
-
- if (lip_filter_generator_ != nullptr) {
- lip_filter_generator_->addSelectionInfo(physical_selection, select_index);
- }
-}
-
-void ExecutionGenerator::convertSharedSubplanReference(const physical::SharedSubplanReferencePtr &physical_plan) {
- const std::unordered_map<physical::PhysicalPtr, CatalogRelationInfo>::const_iterator found_it =
- physical_to_output_relation_map_.find(
- top_level_physical_plan_->shared_subplan_at(physical_plan->subplan_id()));
- if (found_it != physical_to_output_relation_map_.end()) {
- physical_to_output_relation_map_.emplace(physical_plan, found_it->second);
-
- // Propagate the (ExprId -> CatalogAttribute) mapping.
- const std::vector<E::AttributeReferencePtr> &referenced_attributes =
- physical_plan->referenced_attributes();
- const std::vector<E::AttributeReferencePtr> &output_attributes =
- physical_plan->output_attributes();
- for (std::size_t i = 0; i < referenced_attributes.size(); ++i) {
- attribute_substitution_map_[output_attributes[i]->id()] =
- attribute_substitution_map_[referenced_attributes[i]->id()];
- }
- }
-}
-
-void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
- // HashJoin is converted to three operators:
- // BuildHash, HashJoin, DestroyHash. The second is the primary operator.
-
- P::PhysicalPtr probe_physical = physical_plan->left();
- P::PhysicalPtr build_physical = physical_plan->right();
-
- std::vector<attribute_id> probe_attribute_ids;
- std::vector<attribute_id> build_attribute_ids;
-
- std::size_t build_cardinality =
- cost_model_for_hash_join_->estimateCardinality(build_physical);
-
- bool any_probe_attributes_nullable = false;
- bool any_build_attributes_nullable = false;
-
- const std::vector<E::AttributeReferencePtr> &left_join_attributes =
- physical_plan->left_join_attributes();
- for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) {
- const CatalogAttribute *probe_catalog_attribute
- = attribute_substitution_map_[left_join_attribute->id()];
- probe_attribute_ids.emplace_back(probe_catalog_attribute->getID());
-
- if (probe_catalog_attribute->getType().isNullable()) {
- any_probe_attributes_nullable = true;
- }
- }
-
- const std::vector<E::AttributeReferencePtr> &right_join_attributes =
- physical_plan->right_join_attributes();
- for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) {
- const CatalogAttribute *build_catalog_attribute
- = attribute_substitution_map_[right_join_attribute->id()];
- build_attribute_ids.emplace_back(build_catalog_attribute->getID());
-
- if (build_catalog_attribute->getType().isNullable()) {
- any_build_attributes_nullable = true;
- }
- }
-
- // Remember key types for call to SimplifyHashTableImplTypeProto() below.
- std::vector<const Type*> key_types;
- for (std::vector<E::AttributeReferencePtr>::size_type attr_idx = 0;
- attr_idx < left_join_attributes.size();
- ++attr_idx) {
- const Type &left_attribute_type = left_join_attributes[attr_idx]->getValueType();
- const Type &right_attribute_type = right_join_attributes[attr_idx]->getValueType();
- if (left_attribute_type.getTypeID() != right_attribute_type.getTypeID()) {
- THROW_SQL_ERROR() << "Equality join predicate between two attributes of different types "
- "is not allowed in HashJoin";
- }
- key_types.push_back(&left_attribute_type);
- }
-
- // Convert the residual predicate proto.
- QueryContext::predicate_id residual_predicate_index = QueryContext::kInvalidPredicateId;
- if (physical_plan->residual_predicate()) {
- residual_predicate_index = query_context_proto_->predicates_size();
-
- unique_ptr<const Predicate> residual_predicate(convertPredicate(physical_plan->residual_predicate()));
- query_context_proto_->add_predicates()->CopyFrom(residual_predicate->getProto());
- }
-
- // Convert the project expressions proto.
- const QueryContext::scalar_group_id project_expressions_group_index =
- query_context_proto_->scalar_groups_size();
- convertNamedExpressions(physical_plan->project_expressions(),
- query_context_proto_->add_scalar_groups());
-
- const CatalogRelationInfo *build_relation_info =
- findRelationInfoOutputByPhysical(build_physical);
- const CatalogRelationInfo *probe_operator_info =
- findRelationInfoOutputByPhysical(probe_physical);
-
- // Create a vector that indicates whether each project expression is using
- // attributes from the build relation as input. This information is required
- // by the current implementation of hash left outer join
- std::unique_ptr<std::vector<bool>> is_selection_on_build;
- if (physical_plan->join_type() == P::HashJoin::JoinType::kLeftOuterJoin) {
- is_selection_on_build.reset(
- new std::vector<bool>(
- E::MarkExpressionsReferingAnyAttribute(
- physical_plan->project_expressions(),
- build_physical->getOutputAttributes())));
- }
-
- // FIXME(quickstep-team): Add support for self-join.
- if (build_relation_info->relation == probe_operator_info->relation) {
- THROW_SQL_ERROR() << "Self-join is not supported";
- }
-
- // Create join hash table proto.
- const QueryContext::join_hash_table_id join_hash_table_index =
- query_context_proto_->join_hash_tables_size();
- S::HashTable *hash_table_proto = query_context_proto_->add_join_hash_tables();
-
- // SimplifyHashTableImplTypeProto() switches the hash table implementation
- // from SeparateChaining to SimpleScalarSeparateChaining when there is a
- // single scalar key type with a reversible hash function.
- hash_table_proto->set_hash_table_impl_type(
- SimplifyHashTableImplTypeProto(
- HashTableImplTypeProtoFromString(FLAGS_join_hashtable_type),
- key_types));
-
- const CatalogRelationSchema *build_relation = build_relation_info->relation;
- for (const attribute_id build_attribute : build_attribute_ids) {
- hash_table_proto->add_key_types()->CopyFrom(
- build_relation->getAttributeById(build_attribute)->getType().getProto());
- }
-
- hash_table_proto->set_estimated_num_entries(build_cardinality);
-
- // Create three operators.
- const QueryPlan::DAGNodeIndex build_operator_index =
- execution_plan_->addRelationalOperator(
- new BuildHashOperator(
- query_handle_->query_id(),
- *build_relation_info->relation,
- build_relation_info->isStoredRelation(),
- build_attribute_ids,
- any_build_attributes_nullable,
- join_hash_table_index));
-
- // Create InsertDestination proto.
- const CatalogRelation *output_relation = nullptr;
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(physical_plan,
- &output_relation,
- insert_destination_proto);
-
- // Get JoinType
- HashJoinOperator::JoinType join_type;
- switch (physical_plan->join_type()) {
- case P::HashJoin::JoinType::kInnerJoin:
- join_type = HashJoinOperator::JoinType::kInnerJoin;
- break;
- case P::HashJoin::JoinType::kLeftSemiJoin:
- join_type = HashJoinOperator::JoinType::kLeftSemiJoin;
- break;
- case P::HashJoin::JoinType::kLeftAntiJoin:
- join_type = HashJoinOperator::JoinType::kLeftAntiJoin;
- break;
- case P::HashJoin::JoinType::kLeftOuterJoin:
- join_type = HashJoinOperator::JoinType::kLeftOuterJoin;
- break;
- default:
- LOG(FATAL) << "Invalid physical::HashJoin::JoinType: "
- << static_cast<typename std::underlying_type<P::HashJoin::JoinType>::type>(
- physical_plan->join_type());
- }
-
- // Create hash join operator
- const QueryPlan::DAGNodeIndex join_operator_index =
- execution_plan_->addRelationalOperator(
- new HashJoinOperator(
- query_handle_->query_id(),
- *build_relation_info->relation,
- *probe_operator_info->relation,
- probe_operator_info->isStoredRelation(),
- probe_attribute_ids,
- any_probe_attributes_nullable,
- *output_relation,
- insert_destination_index,
- join_hash_table_index,
- residual_predicate_index,
- project_expressions_group_index,
- is_selection_on_build.get(),
- join_type));
- insert_destination_proto->set_relational_op_index(join_operator_index);
-
- const QueryPlan::DAGNodeIndex destroy_operator_index =
- execution_plan_->addRelationalOperator(new DestroyHashOperator(
- query_handle_->query_id(), join_hash_table_index));
-
- if (!build_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(build_operator_index,
- build_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- // Add the dependency for the producer operator of the build relation
- // to prevent the build relation from being destroyed until after the join
- // is complete (see QueryPlan::addDependenciesForDropOperator(), which
- // makes the drop operator for the temporary relation dependent on all its
- // consumers having finished).
- execution_plan_->addDirectDependency(join_operator_index,
- build_relation_info->producer_operator_index,
- true /* is_pipeline_breaker */);
- }
- if (!probe_operator_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(join_operator_index,
- probe_operator_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
- execution_plan_->addDirectDependency(join_operator_index,
- build_operator_index,
- true /* is_pipeline_breaker */);
- execution_plan_->addDirectDependency(destroy_operator_index,
- join_operator_index,
- true /* is_pipeline_breaker */);
-
- physical_to_output_relation_map_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(physical_plan),
- std::forward_as_tuple(join_operator_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
-
- if (lip_filter_generator_ != nullptr) {
- lip_filter_generator_->addHashJoinInfo(physical_plan,
- build_operator_index,
- join_operator_index);
- }
-}
-
-void ExecutionGenerator::convertNestedLoopsJoin(
- const P::NestedLoopsJoinPtr &physical_plan) {
- // NestedLoopsJoin is converted to a NestedLoopsJoin operator.
-
- // Convert the join predicate proto.
- const QueryContext::predicate_id execution_join_predicate_index = query_context_proto_->predicates_size();
- if (physical_plan->join_predicate()) {
- unique_ptr<const Predicate> execution_join_predicate(convertPredicate(physical_plan->join_predicate()));
- query_context_proto_->add_predicates()->CopyFrom(execution_join_predicate->getProto());
- } else {
- query_context_proto_->add_predicates()->set_predicate_type(S::Predicate::TRUE);
- }
-
- // Convert the project expressions proto.
- const QueryContext::scalar_group_id project_expressions_group_index =
- query_context_proto_->scalar_groups_size();
- convertNamedExpressions(physical_plan->project_expressions(),
- query_context_proto_->add_scalar_groups());
-
- const CatalogRelationInfo *left_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->left());
- const CatalogRelationInfo *right_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->right());
-
- // FIXME(quickstep-team): Add support for self-join.
- if (left_relation_info->relation == right_relation_info->relation) {
- THROW_SQL_ERROR() << "NestedLoopsJoin does not support self-join yet";
- }
-
- // Create InsertDestination proto.
- const CatalogRelation *output_relation = nullptr;
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(physical_plan,
- &output_relation,
- insert_destination_proto);
-
- // Create and add a NestedLoopsJoin operator.
- const QueryPlan::DAGNodeIndex join_operator_index =
- execution_plan_->addRelationalOperator(
- new NestedLoopsJoinOperator(query_handle_->query_id(),
- *left_relation_info->relation,
- *right_relation_info->relation,
- *output_relation,
- insert_destination_index,
- execution_join_predicate_index,
- project_expressions_group_index,
- left_relation_info->isStoredRelation(),
- right_relation_info->isStoredRelation()));
- insert_destination_proto->set_relational_op_index(join_operator_index);
-
- if (!left_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(join_operator_index,
- left_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
- if (!right_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(join_operator_index,
- right_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
-
- physical_to_output_relation_map_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(physical_plan),
- std::forward_as_tuple(join_operator_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
-}
-
-void ExecutionGenerator::convertCopyFrom(
- const P::CopyFromPtr &physical_plan) {
- // CopyFrom is converted to a TextScan and a SaveBlocks.
-
- const CatalogRelation *output_relation = physical_plan->catalog_relation();
-
- // Create InsertDestination proto.
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
-
- insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
- insert_destination_proto->set_relation_id(output_relation->getID());
- insert_destination_proto->mutable_layout()->MergeFrom(
- output_relation->getDefaultStorageBlockLayout().getDescription());
-
- const vector<block_id> blocks(physical_plan->catalog_relation()->getBlocksSnapshot());
- for (const block_id block : blocks) {
- insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
- }
-
- const QueryPlan::DAGNodeIndex scan_operator_index =
- execution_plan_->addRelationalOperator(
- new TextScanOperator(
- query_handle_->query_id(),
- physical_plan->file_name(),
- physical_plan->column_delimiter(),
- physical_plan->escape_strings(),
- *output_relation,
- insert_destination_index));
- insert_destination_proto->set_relational_op_index(scan_operator_index);
-
- const QueryPlan::DAGNodeIndex save_blocks_operator_index =
- execution_plan_->addRelationalOperator(
- new SaveBlocksOperator(query_handle_->query_id()));
- execution_plan_->addDirectDependency(save_blocks_operator_index,
- scan_operator_index,
- false /* is_pipeline_breaker */);
-}
-
-void ExecutionGenerator::convertCreateIndex(
- const P::CreateIndexPtr &physical_plan) {
- // CreateIndex is converted to a CreateIndex operator.
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->input());
- CatalogRelation *input_relation =
- catalog_database_->getRelationByIdMutable(
- input_relation_info->relation->getID());
-
- // Check if any index with the specified name already exists.
- if (input_relation->hasIndexWithName(physical_plan->index_name())) {
- THROW_SQL_ERROR() << "The relation " << input_relation->getName()
- << " already has an index named "<< physical_plan->index_name();
- }
-
- DCHECK_GT(physical_plan->index_attributes().size(), 0u);
-
- // Convert attribute references to a vector of pointers to catalog attributes.
- std::vector<const CatalogAttribute*> index_attributes;
- for (const E::AttributeReferencePtr &attribute : physical_plan->index_attributes()) {
- const CatalogAttribute *catalog_attribute
- = input_relation->getAttributeByName(attribute->attribute_name());
- DCHECK(catalog_attribute != nullptr);
- index_attributes.emplace_back(catalog_attribute);
- }
-
- // Create a copy of index description and add all the specified attributes to it.
- IndexSubBlockDescription index_description(*physical_plan->index_description());
- for (const CatalogAttribute* catalog_attribute : index_attributes) {
- index_description.add_indexed_attribute_ids(catalog_attribute->getID());
- }
- if (input_relation->hasIndexWithDescription(index_description)) {
- // Check if the given index description already exists in the relation.
- THROW_SQL_ERROR() << "The relation " << input_relation->getName()
- << " already defines this index on the given attribute(s).";
- }
- if (!SubBlockTypeRegistry::IndexDescriptionIsValid(*input_relation, index_description)) {
- // Check if the given index description is valid.
- THROW_SQL_ERROR() << "The index with given properties cannot be created.";
- }
- execution_plan_->addRelationalOperator(
- new CreateIndexOperator(query_handle_->query_id(),
- input_relation,
- physical_plan->index_name(),
- std::move(index_description)));
-}
-
-void ExecutionGenerator::convertCreateTable(
- const P::CreateTablePtr &physical_plan) {
- // CreateTable is converted to a CreateTable operator.
-
- std::unique_ptr<CatalogRelation> catalog_relation(new CatalogRelation(
- catalog_database_,
- physical_plan->relation_name(),
- -1 /* id */,
- false /* is_temporary*/));
- attribute_id aid = 0;
- for (const E::AttributeReferencePtr &attribute :
- physical_plan->attributes()) {
- std::unique_ptr<CatalogAttribute> catalog_attribute(new CatalogAttribute(
- catalog_relation.get(),
- attribute->attribute_name(),
- attribute->getValueType(),
- aid,
- attribute->attribute_alias()));
- catalog_relation->addAttribute(catalog_attribute.release());
- ++aid;
- }
-
- // If specified, set the physical block type as the users'. Otherwise,
- // the system uses the default layout.
- if (physical_plan->block_properties()) {
- if (!StorageBlockLayout::DescriptionIsValid(*catalog_relation,
- *physical_plan->block_properties())) {
- THROW_SQL_ERROR() << "BLOCKPROPERTIES is invalid.";
- }
-
- std::unique_ptr<StorageBlockLayout> layout(
- new StorageBlockLayout(*catalog_relation, *physical_plan->block_properties()));
- layout->finalize();
- catalog_relation->setDefaultStorageBlockLayout(layout.release());
- }
-
- execution_plan_->addRelationalOperator(
- new CreateTableOperator(query_handle_->query_id(),
- catalog_relation.release(),
- catalog_database_));
-}
-
-void ExecutionGenerator::convertDeleteTuples(
- const P::DeleteTuplesPtr &physical_plan) {
- // If there is a selection predicate and the predicate value
- // is not statically true, DeleteTuples is converted to
- // a DeleteOperator and a SaveBlocksOperator; if there is not
- // a selection predicate or the predicate value is statically true,
- // it is converted to a DropTableOperator; otherwise, the predicate
- // value is statically false, so no operator needs to be created.
-
- unique_ptr<const Predicate> execution_predicate;
- if (physical_plan->predicate()) {
- execution_predicate.reset(convertPredicate(physical_plan->predicate()));
- }
-
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->input());
- DCHECK(input_relation_info != nullptr);
- if (execution_predicate == nullptr ||
- (execution_predicate->hasStaticResult() &&
- execution_predicate->getStaticResult())) {
- const QueryPlan::DAGNodeIndex drop_table_index =
- execution_plan_->addRelationalOperator(
- new DropTableOperator(query_handle_->query_id(),
- *input_relation_info->relation,
- catalog_database_,
- true /* only_drop_blocks */));
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(drop_table_index,
- input_relation_info->producer_operator_index,
- true /* is_pipeline_breaker */);
- }
- } else if (!execution_predicate->hasStaticResult()) {
- const QueryContext::predicate_id execution_predicate_index = query_context_proto_->predicates_size();
- query_context_proto_->add_predicates()->CopyFrom(execution_predicate->getProto());
-
- const QueryPlan::DAGNodeIndex delete_tuples_index =
- execution_plan_->addRelationalOperator(
- new DeleteOperator(query_handle_->query_id(),
- *input_relation_info->relation,
- execution_predicate_index,
- input_relation_info->isStoredRelation()));
-
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(delete_tuples_index,
- input_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
-
- const QueryPlan::DAGNodeIndex save_blocks_index =
- execution_plan_->addRelationalOperator(
- new SaveBlocksOperator(query_handle_->query_id()));
- execution_plan_->addDirectDependency(save_blocks_index,
- delete_tuples_index,
- false /* is_pipeline_breaker */);
- }
-}
-
-void ExecutionGenerator::convertDropTable(
- const P::DropTablePtr &physical_plan) {
- // DropTable is converted to a DropTable operator.
- const CatalogRelation &catalog_relation = *physical_plan->catalog_relation();
-
-#ifdef QUICKSTEP_DISTRIBUTED
- referenced_relation_ids_.insert(catalog_relation.getID());
-#endif
-
- execution_plan_->addRelationalOperator(
- new DropTableOperator(query_handle_->query_id(),
- catalog_relation,
- catalog_database_));
-}
-
-void ExecutionGenerator::convertInsertTuple(
- const P::InsertTuplePtr &physical_plan) {
- // InsertTuple is converted to an Insert and a SaveBlocks.
-
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->input());
- const CatalogRelation &input_relation =
- *catalog_database_->getRelationById(
- input_relation_info->relation->getID());
-
- // Construct the tuple proto to be inserted.
- const QueryContext::tuple_id tuple_index = query_context_proto_->tuples_size();
-
- S::Tuple *tuple_proto = query_context_proto_->add_tuples();
- for (const E::ScalarLiteralPtr &literal : physical_plan->column_values()) {
- tuple_proto->add_attribute_values()->CopyFrom(literal->value().getProto());
- }
-
- // FIXME(qzeng): A better way is using a traits struct to look up whether a storage
- // block supports ad-hoc insertion instead of hard-coding the block types.
- const StorageBlockLayout &storage_block_layout =
- input_relation.getDefaultStorageBlockLayout();
- if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
- TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE ||
- storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
- TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
- THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
- << input_relation.getName()
- << ", because its storage blocks do not support ad-hoc insertion";
- }
-
- // Create InsertDestination proto.
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
-
- insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
- insert_destination_proto->set_relation_id(input_relation.getID());
- insert_destination_proto->mutable_layout()->MergeFrom(
- input_relation.getDefaultStorageBlockLayout().getDescription());
-
- const vector<block_id> blocks(input_relation.getBlocksSnapshot());
- for (const block_id block : blocks) {
- insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
- }
-
- const QueryPlan::DAGNodeIndex insert_operator_index =
- execution_plan_->addRelationalOperator(
- new InsertOperator(query_handle_->query_id(),
- input_relation,
- insert_destination_index,
- tuple_index));
- insert_destination_proto->set_relational_op_index(insert_operator_index);
-
- const QueryPlan::DAGNodeIndex save_blocks_index =
- execution_plan_->addRelationalOperator(
- new SaveBlocksOperator(query_handle_->query_id()));
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(insert_operator_index,
- input_relation_info->producer_operator_index,
- true /* is_pipeline_breaker */);
- }
- execution_plan_->addDirectDependency(save_blocks_index,
- insert_operator_index,
- false /* is_pipeline_breaker */);
-}
-
-void ExecutionGenerator::convertInsertSelection(
- const P::InsertSelectionPtr &physical_plan) {
- // InsertSelection is converted to a Select and a SaveBlocks.
-
- const CatalogRelationInfo *destination_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->destination());
- const CatalogRelation &destination_relation = *destination_relation_info->relation;
-
- // FIXME(qzeng): A better way is using a traits struct to look up whether a storage
- // block supports ad-hoc insertion instead of hard-coding the block types.
- const StorageBlockLayout &storage_block_layout =
- destination_relation.getDefaultStorageBlockLayout();
- if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
- TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE
- || storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
- TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
- THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
- << destination_relation.getName()
- << ", because its storage blocks do not support ad-hoc insertion";
- }
-
- // Create InsertDestination proto.
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
-
- insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
- insert_destination_proto->set_relation_id(destination_relation.getID());
- insert_destination_proto->mutable_layout()->MergeFrom(
- destination_relation.getDefaultStorageBlockLayout().getDescription());
-
- const vector<block_id> blocks(destination_relation.getBlocksSnapshot());
- for (const block_id block : blocks) {
- insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
- }
-
- const CatalogRelationInfo *selection_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->selection());
-
- // Prepare the attributes, which are output columns of the selection relation.
- std::vector<attribute_id> attributes;
- for (E::AttributeReferencePtr attr_ref : physical_plan->selection()->getOutputAttributes()) {
- unique_ptr<const Scalar> attribute(attr_ref->concretize(attribute_substitution_map_));
-
- DCHECK_EQ(Scalar::kAttribute, attribute->getDataSource());
- attributes.push_back(
- static_cast<const ScalarAttribute*>(attribute.get())->getAttribute().getID());
- }
-
- // Create the select operator.
- // TODO(jianqiao): This select operator is actually redundant. That is,
- // we may directly set physical_plan_->selection()'s output relation to be
- // destination_relation, instead of creating an intermediate selection_relation
- // and then copy the data into destination_relation. One way to achieve this
- // optimization is to enable specifying a specific output relation for each
- // physical plan by modifying class Physical.
- SelectOperator *insert_selection_op =
- new SelectOperator(query_handle_->query_id(),
- *selection_relation_info->relation,
- destination_relation,
- insert_destination_index,
- QueryContext::kInvalidPredicateId,
- move(attributes),
- selection_relation_info->isStoredRelation());
-
- const QueryPlan::DAGNodeIndex insert_selection_index =
- execution_plan_->addRelationalOperator(insert_selection_op);
- insert_destination_proto->set_relational_op_index(insert_selection_index);
-
- const QueryPlan::DAGNodeIndex save_blocks_index =
- execution_plan_->addRelationalOperator(new SaveBlocksOperator(query_handle_->query_id()));
-
- if (!selection_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(insert_selection_index,
- selection_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
- execution_plan_->addDirectDependency(save_blocks_index,
- insert_selection_index,
- false /* is_pipeline_breaker */);
-}
-
-void ExecutionGenerator::convertUpdateTable(
- const P::UpdateTablePtr &physical_plan) {
- // UpdateTable is converted to an Update and a SaveBlocks.
-
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->input());
- DCHECK(input_relation_info != nullptr);
-
- const relation_id input_rel_id = input_relation_info->relation->getID();
-
- // Create InsertDestination proto.
- const QueryContext::insert_destination_id relocation_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *relocation_destination_proto = query_context_proto_->add_insert_destinations();
-
- relocation_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
- relocation_destination_proto->set_relation_id(input_rel_id);
-
- // Convert the predicate proto.
- QueryContext::predicate_id execution_predicate_index = QueryContext::kInvalidPredicateId;
- if (physical_plan->predicate()) {
- execution_predicate_index = query_context_proto_->predicates_size();
-
- unique_ptr<const Predicate> execution_predicate(convertPredicate(physical_plan->predicate()));
- query_context_proto_->add_predicates()->CopyFrom(execution_predicate->getProto());
- }
-
- // Convert assignment expressions as a UpdateGroup proto.
- const vector<E::AttributeReferencePtr> &assignees = physical_plan->assignees();
- const vector<E::ScalarPtr> &assignment_expressions = physical_plan->assignment_expressions();
-
- DCHECK_EQ(assignees.size(), assignment_expressions.size())
- << physical_plan->toString();
-
- const QueryContext::update_group_id update_group_index = query_context_proto_->update_groups_size();
- S::QueryContext::UpdateGroup *update_group_proto = query_context_proto_->add_update_groups();
- update_group_proto->set_relation_id(input_rel_id);
-
- for (vector<E::AttributeReferencePtr>::size_type i = 0; i < assignees.size(); ++i) {
- unique_ptr<const Scalar> attribute(
- assignees[i]->concretize(attribute_substitution_map_));
- DCHECK_EQ(Scalar::kAttribute, attribute->getDataSource())
- << assignees[i]->toString();
-
- S::QueryContext::UpdateGroup::UpdateAssignment *update_assignment_proto =
- update_group_proto->add_update_assignments();
-
- update_assignment_proto->set_attribute_id(
- static_cast<const ScalarAttribute*>(attribute.get())->getAttribute().getID());
-
- unique_ptr<const Scalar> value(
- assignment_expressions[i]->concretize(attribute_substitution_map_));
- update_assignment_proto->mutable_scalar()->CopyFrom(value->getProto());
- }
-
- const QueryPlan::DAGNodeIndex update_operator_index =
- execution_plan_->addRelationalOperator(new UpdateOperator(
- query_handle_->query_id(),
- *catalog_database_->getRelationById(
- input_rel_id),
- relocation_destination_index,
- execution_predicate_index,
- update_group_index));
- relocation_destination_proto->set_relational_op_index(update_operator_index);
-
- const QueryPlan::DAGNodeIndex save_blocks_index =
- execution_plan_->addRelationalOperator(
- new SaveBlocksOperator(query_handle_->query_id()));
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(update_operator_index,
- input_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
- execution_plan_->addDirectDependency(save_blocks_index,
- update_operator_index,
- false /* is_pipeline_breaker */);
-}
-
-void ExecutionGenerator::convertAggregate(
- const P::AggregatePtr &physical_plan) {
- // Create aggr state proto.
- const QueryContext::aggregation_state_id aggr_state_index =
- query_context_proto_->aggregation_states_size();
- S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
-
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->input());
- aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
-
- std::vector<const Type*> group_by_types;
- for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
- unique_ptr<const Scalar> execution_group_by_expression;
- E::AliasPtr alias;
- if (E::SomeAlias::MatchesWithConditionalCast(grouping_expression, &alias)) {
- E::ScalarPtr scalar;
- // NOTE(zuyu): For aggregate expressions, all child expressions of an
- // Alias should be a Scalar.
- CHECK(E::SomeScalar::MatchesWithConditionalCast(alias->expression(), &scalar))
- << alias->toString();
- execution_group_by_expression.reset(scalar->concretize(attribute_substitution_map_));
- } else {
- execution_group_by_expression.reset(
- grouping_expression->concretize(attribute_substitution_map_));
- }
- aggr_state_proto->add_group_by_expressions()->CopyFrom(execution_group_by_expression->getProto());
- group_by_types.push_back(&execution_group_by_expression->getType());
- }
-
- if (!group_by_types.empty()) {
- // Right now, only SeparateChaining is supported.
- aggr_state_proto->set_hash_table_impl_type(
- serialization::HashTableImplType::SEPARATE_CHAINING);
- }
-
- for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
- const E::AggregateFunctionPtr unnamed_aggregate_expression =
- std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
-
- // Add a new entry in 'aggregates'.
- S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
-
- // Set the AggregateFunction.
- aggr_proto->mutable_function()->CopyFrom(
- unnamed_aggregate_expression->getAggregate().getProto());
-
- // Add each of the aggregate's arguments.
- for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
- unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
- aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
- }
-
- // Set whether it is a DISTINCT aggregation.
- aggr_proto->set_is_distinct(unnamed_aggregate_expression->is_distinct());
-
- // Add distinctify hash table impl type if it is a DISTINCT aggregation.
- if (unnamed_aggregate_expression->is_distinct()) {
- const std::vector<E::ScalarPtr> &arguments = unnamed_aggregate_expression->getArguments();
- DCHECK_GE(arguments.size(), 1u);
- // Right now only SeparateChaining implementation is supported.
- aggr_state_proto->add_distinctify_hash_table_impl_types(
- serialization::HashTableImplType::SEPARATE_CHAINING);
- }
- }
-
- if (physical_plan->filter_predicate() != nullptr) {
- unique_ptr<const Predicate> predicate(convertPredicate(physical_plan->filter_predicate()));
- aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
- }
-
- const std::size_t estimated_num_groups =
- cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
- aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
-
- const QueryPlan::DAGNodeIndex aggregation_operator_index =
- execution_plan_->addRelationalOperator(
- new AggregationOperator(
- query_handle_->query_id(),
- *input_relation_info->relation,
- input_relation_info->isStoredRelation(),
- aggr_state_index));
-
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(aggregation_operator_index,
- input_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
-
- // Create InsertDestination proto.
- const CatalogRelation *output_relation = nullptr;
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(physical_plan,
- &output_relation,
- insert_destination_proto);
-
- const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index =
- execution_plan_->addRelationalOperator(
- new FinalizeAggregationOperator(query_handle_->query_id(),
- aggr_state_index,
- *output_relation,
- insert_destination_index));
-
- insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index);
-
- execution_plan_->addDirectDependency(finalize_aggregation_operator_index,
- aggregation_operator_index,
- true /* is_pipeline_breaker */);
-
- physical_to_output_relation_map_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(physical_plan),
- std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
- temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
- output_relation);
-
- const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
- execution_plan_->addRelationalOperator(
- new DestroyAggregationStateOperator(query_handle_->query_id(),
- aggr_state_index));
-
- execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
- finalize_aggregation_operator_index,
- true);
-
- if (lip_filter_generator_ != nullptr) {
- lip_filter_generator_->addAggregateInfo(physical_plan,
- aggregation_operator_index);
- }
-}
-
-void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
- // Create sort configuration for run generation.
- vector<bool> sort_ordering(physical_sort->sort_ascending());
- vector<bool> sort_null_ordering(physical_sort->nulls_first_flags());
- PtrVector<Scalar> sort_run_gen_attributes;
- for (const E::AttributeReferencePtr &sort_attribute :
- physical_sort->sort_attributes()) {
- sort_run_gen_attributes.push_back(
- sort_attribute->concretize(attribute_substitution_map_));
- }
- const SortConfiguration sort_run_gen_config(sort_run_gen_attributes,
- std::move(sort_ordering),
- std::move(sort_null_ordering));
- const QueryContext::sort_config_id sort_run_gen_config_id =
- query_context_proto_->sort_configs_size();
- S::SortConfiguration *sort_run_gen_config_proto =
- query_context_proto_->add_sort_configs();
- sort_run_gen_config_proto->CopyFrom(sort_run_gen_config.getProto());
-
- // Create SortRunGenerationOperator.
- const CatalogRelation *initial_runs_relation;
- const QueryContext::insert_destination_id initial_runs_destination_id =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *initial_runs_destination_proto =
- query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(
- physical_sort, &initial_runs_relation, initial_runs_destination_proto);
-
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_sort->input());
- const QueryPlan::DAGNodeIndex run_generator_index =
- execution_plan_->addRelationalOperator(new SortRunGenerationOperator(
- query_handle_->query_id(),
- *input_relation_info->relation,
- *initial_runs_relation,
- initial_runs_destination_id,
- sort_run_gen_config_id,
- input_relation_info->isStoredRelation()));
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(run_generator_index,
- input_relation_info->producer_operator_index,
- false /* is_pipeline_breaker */);
- }
- temporary_relation_info_vec_.emplace_back(run_generator_index,
- initial_runs_relation);
- initial_runs_destination_proto->set_relational_op_index(run_generator_index);
-
- // Create sort configuration for run merging.
- sort_ordering = physical_sort->sort_ascending();
- sort_null_ordering = physical_sort->nulls_first_flags();
- PtrVector<Scalar> sort_merge_run_attributes;
- for (const E::AttributeReferencePtr &sort_attribute :
- physical_sort->sort_attributes()) {
- sort_merge_run_attributes.push_back(
- sort_attribute->concretize(attribute_substitution_map_));
- }
- const SortConfiguration sort_merge_run_config(sort_merge_run_attributes,
- std::move(sort_ordering),
- std::move(sort_null_ordering));
- const QueryContext::sort_config_id sort_merge_run_config_id =
- query_context_proto_->sort_configs_size();
- S::SortConfiguration *sort_merge_run_config_proto =
- query_context_proto_->add_sort_configs();
- sort_merge_run_config_proto->CopyFrom(sort_merge_run_config.getProto());
-
- // Create SortMergeRunOperator.
- const CatalogRelation *merged_runs_relation;
- const QueryContext::insert_destination_id merged_runs_destination_id =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *merged_runs_destination_proto =
- query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(physical_sort,
- &merged_runs_relation,
- merged_runs_destination_proto);
- const CatalogRelation *sorted_relation;
- const QueryContext::insert_destination_id sorted_output_destination_id =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *sorted_output_destination_proto =
- query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(physical_sort,
- &sorted_relation,
- sorted_output_destination_proto);
-
- // TODO(qzeng): Make the merge factor configurable.
- const QueryPlan::DAGNodeIndex merge_run_operator_index =
- execution_plan_->addRelationalOperator(new SortMergeRunOperator(
- query_handle_->query_id(),
- *initial_runs_relation,
- *sorted_relation,
- sorted_output_destination_id,
- *merged_runs_relation,
- merged_runs_destination_id,
- sort_merge_run_config_id,
- 64 /* merge_factor */,
- physical_sort->limit(),
- false /* input_relation_is_stored */));
-
- execution_plan_->addDirectDependency(merge_run_operator_index,
- run_generator_index,
- false /* is_pipeline_breaker */);
- merged_runs_destination_proto->set_relational_op_index(merge_run_operator_index);
- sorted_output_destination_proto->set_relational_op_index(merge_run_operator_index);
-
- // Do not add merged_runs_relation into 'temporary_relation_info_vec_'
- // and create the DropTableOperator for it at the end. Instead, add the drop
- // operator right here, because the relation won't be used by any other operator.
- const QueryPlan::DAGNodeIndex drop_merged_runs_index =
- execution_plan_->addRelationalOperator(
- new DropTableOperator(
- query_handle_->query_id(),
- *merged_runs_relation,
- catalog_database_,
- false /* only_drop_blocks */));
- execution_plan_->addDirectDependency(
- drop_merged_runs_index,
- merge_run_operator_index,
- true /* is_pipeline_breaker */);
-
- temporary_relation_info_vec_.emplace_back(merge_run_operator_index,
- sorted_relation);
- physical_to_output_relation_map_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(physical_sort),
- std::forward_as_tuple(merge_run_operator_index,
- sorted_relation));
-}
-
-void ExecutionGenerator::convertTableGenerator(
- const P::TableGeneratorPtr &physical_tablegen) {
- // Create InsertDestination proto.
- const CatalogRelation *output_relation = nullptr;
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto =
- query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(physical_tablegen,
- &output_relation,
- insert_destination_proto);
-
- // Create GeneratorFunctionHandle proto
- const QueryContext::generator_function_id generator_function_index =
- query_context_proto_->generator_functions_size();
- query_context_proto_->add_generator_functions()->CopyFrom(
- physical_tablegen->generator_function_handle()->getProto());
-
- TableGeneratorOperator *op =
- new TableGeneratorOperator(query_handle_->query_id(),
- *output_relation,
- insert_destination_index,
- generator_function_index);
-
- const QueryPlan::DAGNodeIndex tablegen_index =
- execution_plan_->addRelationalOperator(op);
- insert_destination_proto->set_relational_op_index(tablegen_index);
-
- physical_to_output_relation_map_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(physical_tablegen),
- std::forward_as_tuple(tablegen_index,
- output_relation));
- temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation);
-}
-
-void ExecutionGenerator::convertWindowAggregate(
- const P::WindowAggregatePtr &physical_plan) {
- // Create window_aggregation_operation_state proto.
- const QueryContext::window_aggregation_state_id window_aggr_state_index =
- query_context_proto_->window_aggregation_states_size();
- S::WindowAggregationOperationState *window_aggr_state_proto =
- query_context_proto_->add_window_aggregation_states();
-
- // Get input.
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->input());
- window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
-
- // Get window aggregate function expression.
- const E::AliasPtr &named_window_aggregate_expression =
- physical_plan->window_aggregate_expression();
- const E::WindowAggregateFunctionPtr &window_aggregate_function =
- std::static_pointer_cast<const E::WindowAggregateFunction>(
- named_window_aggregate_expression->expression());
-
- // Set the WindowAggregateFunction.
- window_aggr_state_proto->mutable_function()->MergeFrom(
- window_aggregate_function->window_aggregate().getProto());
-
- // Set the arguments.
- for (const E::ScalarPtr &argument : window_aggregate_function->arguments()) {
- unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
- window_aggr_state_proto->add_arguments()->MergeFrom(concretized_argument->getProto());
- }
-
- // Set partition keys.
- const E::WindowInfo &window_info = window_aggregate_function->window_info();
- for (const E::ScalarPtr &partition_by_attribute
- : window_info.partition_by_attributes) {
- unique_ptr<const Scalar> concretized_partition_by_attribute(
- partition_by_attribute->concretize(attribute_substitution_map_));
- window_aggr_state_proto->add_partition_by_attributes()
- ->MergeFrom(concretized_partition_by_attribute->getProto());
- }
-
- // Set order keys.
- for (const E::ScalarPtr &order_by_attribute
- : window_info.order_by_attributes) {
- unique_ptr<const Scalar> concretized_order_by_attribute(
- order_by_attribute->concretize(attribute_substitution_map_));
- window_aggr_state_proto->add_order_by_attributes()
- ->MergeFrom(concretized_order_by_attribute->getProto());
- }
-
- // Set window frame info.
- if (window_info.frame_info == nullptr) {
- // If the frame is not specified, use the default setting:
- // 1. If ORDER BY key is specified, use cumulative aggregation:
- // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
- // 2. If ORDER BY key is not specified either, use the whole partition:
- // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
- window_aggr_state_proto->set_is_row(true); // frame mode: ROWS.
- window_aggr_state_proto->set_num_preceding(-1); // UNBOUNDED PRECEDING.
- window_aggr_state_proto->set_num_following(
- window_info.order_by_attributes.empty()
- ? -1 // UNBOUNDED FOLLOWING.
- : 0); // CURRENT ROW.
- } else {
- const E::WindowFrameInfo *window_frame_info = window_info.frame_info;
- window_aggr_state_proto->set_is_row(window_frame_info->is_row);
- window_aggr_state_proto->set_num_preceding(window_frame_info->num_preceding);
- window_aggr_state_proto->set_num_following(window_frame_info->num_following);
- }
-
- // Create InsertDestination proto.
- const CatalogRelation *output_relation = nullptr;
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
- createTemporaryCatalogRelation(physical_plan,
- &output_relation,
- insert_destination_proto);
-
- const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
- execution_plan_->addRelationalOperator(
- new WindowAggregationOperator(query_handle_->query_id(),
- *input_relation_info->relation,
- *output_relation,
- window_aggr_state_index,
- insert_destination_index));
-
- // TODO(Shixuan): Once parallelism is introduced, the is_pipeline_breaker
- // could be set to false.
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(window_aggregation_operator_index,
- input_relation_info->producer_operator_index,
- true /* is_pipeline_breaker */);
- }
-
- insert_destination_proto->set_relational_op_index(window_aggregation_operator_index);
-
- // Add to map and temp_relation_info_vec.
- physical_to_output_relation_map_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(physical_plan),
- std::forward_as_tuple(window_aggregation_operator_index, output_relation));
- temporary_relation_info_vec_.emplace_back(window_aggregation_operator_index,
- output_relation);
-}
-
-} // namespace optimizer
-} // namespace quickstep