You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2023/01/09 04:31:40 UTC
[flink] branch master updated: [FLINK-30365][table-planner] New dynamic partition pruning strategy to support more dpp patterns
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d9f9b55f82d [FLINK-30365][table-planner] New dynamic partition pruning strategy to support more dpp patterns
d9f9b55f82d is described below
commit d9f9b55f82dfbc1676572cc36b718a99001497f8
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Mon Dec 12 12:06:38 2022 +0800
[FLINK-30365][table-planner] New dynamic partition pruning strategy to support more dpp patterns
This closes #21489
---
.../FlinkDynamicPartitionPruningProgram.java | 148 +++++
.../batch/DynamicPartitionPruningRule.java | 739 ---------------------
.../utils/DynamicPartitionPruningUtils.java | 699 +++++++++++--------
.../planner/plan/metadata/FlinkRelMdRowCount.scala | 18 +-
.../plan/optimize/program/FlinkBatchProgram.scala | 4 +
.../planner/plan/rules/FlinkBatchRuleSets.scala | 21 +-
.../DynamicPartitionPruningProgramTest.java} | 196 +++++-
.../DynamicPartitionPruningProgramTest.xml} | 375 +++++++++--
8 files changed, 1102 insertions(+), 1098 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java
new file mode 100644
index 00000000000..22aebd7fcab
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner program that tries to do partition prune in the execution phase, which can translate a
+ * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan}
+ * whose source is a partition source. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * LogicalProject(...)
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ * * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
+ * * +- Exchange(distribution=[broadcast])
+ * * +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ * * +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>This physical plan will be rewritten to:
+ *
+ * <pre>{@code
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
+ * : +- DynamicFilteringDataCollector(fields=[dim_key])
+ * : +- Calc(select=[xxx], where=[<(xxx, xxx)])
+ * : +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * +- Exchange(distribution=[broadcast])
+ * +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ * +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>Note: We use a {@link FlinkOptimizeProgram} instead of a {@link
+ * org.apache.calcite.plan.RelRule} here because the {@link org.apache.calcite.plan.hep.HepPlanner}
+ * doesn't support matching a partially determined pattern or dynamically replacing the inputs of
+ * matched nodes. Once we improve the {@link org.apache.calcite.plan.hep.HepPlanner}, then class can
+ * be converted to {@link org.apache.calcite.plan.RelRule}.
+ */
+public class FlinkDynamicPartitionPruningProgram
+ implements FlinkOptimizeProgram<BatchOptimizeContext> {
+
+ @Override
+ public RelNode optimize(RelNode root, BatchOptimizeContext context) {
+ if (!ShortcutUtils.unwrapContext(root)
+ .getTableConfig()
+ .get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+ return root;
+ }
+ DefaultRelShuttle shuttle =
+ new DefaultRelShuttle() {
+ @Override
+ public RelNode visit(RelNode rel) {
+ if (!(rel instanceof Join)
+ || !DynamicPartitionPruningUtils.isSuitableJoin((Join) rel)) {
+ List<RelNode> newInputs = new ArrayList<>();
+ for (RelNode input : rel.getInputs()) {
+ RelNode newInput = input.accept(this);
+ newInputs.add(newInput);
+ }
+ return rel.copy(rel.getTraitSet(), newInputs);
+ }
+ Join join = (Join) rel;
+ JoinInfo joinInfo = join.analyzeCondition();
+ RelNode leftSide = join.getLeft();
+ RelNode rightSide = join.getRight();
+ Join newJoin = join;
+ boolean changed = false;
+ if (DynamicPartitionPruningUtils.isDppDimSide(leftSide)) {
+ if (join.getJoinType() != JoinRelType.RIGHT) {
+ Tuple2<Boolean, RelNode> relTuple =
+ DynamicPartitionPruningUtils
+ .canConvertAndConvertDppFactSide(
+ rightSide,
+ joinInfo.rightKeys,
+ leftSide,
+ joinInfo.leftKeys);
+ changed = relTuple.f0;
+ newJoin =
+ join.copy(
+ join.getTraitSet(),
+ Arrays.asList(leftSide, relTuple.f1.accept(this)));
+ }
+ } else if (DynamicPartitionPruningUtils.isDppDimSide(rightSide)) {
+ if (join.getJoinType() != JoinRelType.LEFT) {
+ Tuple2<Boolean, RelNode> relTuple =
+ DynamicPartitionPruningUtils
+ .canConvertAndConvertDppFactSide(
+ leftSide,
+ joinInfo.leftKeys,
+ rightSide,
+ joinInfo.rightKeys);
+ changed = relTuple.f0;
+ newJoin =
+ join.copy(
+ join.getTraitSet(),
+ Arrays.asList(relTuple.f1.accept(this), rightSide));
+ }
+ }
+
+ if (changed) {
+ return newJoin;
+ } else {
+ return newJoin.copy(
+ newJoin.getTraitSet(),
+ Arrays.asList(
+ newJoin.getLeft().accept(this),
+ newJoin.getRight().accept(this)));
+ }
+ }
+ };
+ return shuttle.visit(root);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
deleted file mode 100644
index b0767d27f05..00000000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
+++ /dev/null
@@ -1,739 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.rules.physical.batch;
-
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringDataCollector;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalJoinBase;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
-import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
-
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Exchange;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.calcite.tools.RuleSet;
-import org.apache.calcite.tools.RuleSets;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Planner rule that tries to do partition prune in the execution phase, which can translate a
- * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan}
- * whose source is a partition source. The {@link
- * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true.
- *
- * <p>Suppose we have the original physical plan:
- *
- * <pre>{@code
- * LogicalProject(...)
- * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
- * * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
- * * +- Exchange(distribution=[broadcast])
- * * +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
- * * +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
- * }</pre>
- *
- * <p>This physical plan will be rewritten to:
- *
- * <pre>{@code
- * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
- * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
- * : +- DynamicFilteringDataCollector(fields=[dim_key])
- * : +- Calc(select=[xxx], where=[<(xxx, xxx)])
- * : +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
- * +- Exchange(distribution=[broadcast])
- * +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
- * +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
- * }</pre>
- */
-public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config> {
-
- // To support more patterns of dynamic partition pruning, in this rule base, we provide eight
- // different matching rules according to the different situation of the fact side (partition
- // table side) and the different order of left/right join.
- public static final RuleSet DYNAMIC_PARTITION_PRUNING_RULES =
- RuleSets.ofList(
- DynamicPartitionPruningFactInRightRule.Config.DEFAULT.toRule(),
- DynamicPartitionPruningFactInLeftRule.Config.DEFAULT.toRule(),
- DynamicPartitionPruningFactInRightWithExchangeRule.Config.DEFAULT.toRule(),
- DynamicPartitionPruningFactInLeftWithExchangeRule.Config.DEFAULT.toRule(),
- DynamicPartitionPruningFactInRightWithCalcRule.Config.DEFAULT.toRule(),
- DynamicPartitionPruningFactInLeftWithCalcRule.Config.DEFAULT.toRule(),
- DynamicPartitionPruningFactInRightWithExchangeAndCalcRule.Config.DEFAULT
- .toRule(),
- DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule.Config.DEFAULT
- .toRule());
-
- protected DynamicPartitionPruningRule(RelRule.Config config) {
- super(config);
- }
-
- private static List<Integer> getAcceptedFieldIndices(
- List<Integer> factJoinKeys,
- @Nullable BatchPhysicalCalc factCalc,
- BatchPhysicalTableSourceScan factScan,
- DynamicTableSource tableSource) {
- List<String> candidateFields;
- if (factCalc == null) {
- candidateFields =
- factJoinKeys.stream()
- .map(i -> factScan.getRowType().getFieldNames().get(i))
- .collect(Collectors.toList());
- } else {
- // Changing the fact key index in fact table calc output to fact key index in fact
- // table, and filtering these fields that computing in calc node.
- RexProgram program = factCalc.getProgram();
- List<Integer> joinKeysIndexInFactTable = new ArrayList<>();
- for (int joinKeyIdx : factJoinKeys) {
- RexNode node = program.expandLocalRef(program.getProjectList().get(joinKeyIdx));
- if (node instanceof RexInputRef) {
- joinKeysIndexInFactTable.add(((RexInputRef) node).getIndex());
- }
- }
-
- if (joinKeysIndexInFactTable.isEmpty()) {
- return Collections.emptyList();
- }
-
- candidateFields =
- joinKeysIndexInFactTable.stream()
- .map(i -> factScan.getRowType().getFieldNames().get(i))
- .collect(Collectors.toList());
- }
-
- List<String> acceptedFilterFields =
- DynamicPartitionPruningUtils.getSuitableDynamicFilteringFieldsInFactSide(
- tableSource, candidateFields);
- // Apply suitable accepted filter fields to source.
- ((SupportsDynamicFiltering) tableSource).applyDynamicFiltering(acceptedFilterFields);
-
- if (factCalc == null) {
- return acceptedFilterFields.stream()
- .map(f -> factScan.getRowType().getFieldNames().indexOf(f))
- .collect(Collectors.toList());
- } else {
- return getAcceptedFieldsIndicesInCalc(
- acceptedFilterFields, factJoinKeys, factCalc, factScan);
- }
- }
-
- private static List<Integer> getAcceptedFieldsIndicesInCalc(
- List<String> acceptedFields,
- List<Integer> factJoinKeys,
- BatchPhysicalCalc factCalc,
- BatchPhysicalTableSourceScan factScan) {
- List<Integer> acceptedFieldsIndicesInFactScan =
- acceptedFields.stream()
- .map(f -> factScan.getRowType().getFieldNames().indexOf(f))
- .collect(Collectors.toList());
- RexProgram program = factCalc.getProgram();
- List<Integer> acceptedFieldsIndicesInCalc = new ArrayList<>();
- for (int joinKeyIdx : factJoinKeys) {
- RexNode node = program.expandLocalRef(program.getProjectList().get(joinKeyIdx));
- if (node instanceof RexInputRef
- && acceptedFieldsIndicesInFactScan.contains(((RexInputRef) node).getIndex())) {
- acceptedFieldsIndicesInCalc.add(joinKeyIdx);
- }
- }
-
- return acceptedFieldsIndicesInCalc;
- }
-
- protected BatchPhysicalDynamicFilteringTableSourceScan createDynamicFilteringTableSourceScan(
- BatchPhysicalTableSourceScan factScan,
- BatchPhysicalRel dimSide,
- BatchPhysicalJoinBase join,
- @Nullable BatchPhysicalCalc factCalc,
- boolean factInLeft) {
- JoinInfo joinInfo = join.analyzeCondition();
- TableSourceTable tableSourceTable = factScan.getTable().unwrap(TableSourceTable.class);
- DynamicTableSource tableSource = tableSourceTable.tableSource();
-
- List<Integer> factJoinKeys = factInLeft ? joinInfo.leftKeys : joinInfo.rightKeys;
- List<Integer> dimJoinKeys = factInLeft ? joinInfo.rightKeys : joinInfo.leftKeys;
-
- List<Integer> acceptedFieldIndices =
- getAcceptedFieldIndices(factJoinKeys, factCalc, factScan, tableSource);
-
- List<Integer> dynamicFilteringFieldIndices = new ArrayList<>();
- for (int i = 0; i < joinInfo.leftKeys.size(); ++i) {
- if (acceptedFieldIndices.contains(factJoinKeys.get(i))) {
- dynamicFilteringFieldIndices.add(dimJoinKeys.get(i));
- }
- }
- final BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector =
- createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices);
- return new BatchPhysicalDynamicFilteringTableSourceScan(
- factScan.getCluster(),
- factScan.getTraitSet(),
- factScan.getHints(),
- factScan.tableSourceTable(),
- dynamicFilteringDataCollector);
- }
-
- private BatchPhysicalDynamicFilteringDataCollector createDynamicFilteringConnector(
- RelNode dimSide, List<Integer> dynamicFilteringFieldIndices) {
- final RelDataType outputType =
- ((FlinkTypeFactory) dimSide.getCluster().getTypeFactory())
- .projectStructType(
- dimSide.getRowType(),
- dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray());
- return new BatchPhysicalDynamicFilteringDataCollector(
- dimSide.getCluster(),
- dimSide.getTraitSet(),
- ignoreExchange(dimSide),
- outputType,
- dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray());
- }
-
- private RelNode ignoreExchange(RelNode dimSide) {
- if (dimSide instanceof Exchange) {
- return dimSide.getInput(0);
- } else {
- return dimSide;
- }
- }
-
- /** Simple dynamic filtering pattern with fact side in join right. */
- protected static class DynamicPartitionPruningFactInRightRule
- extends DynamicPartitionPruningRule {
-
- public DynamicPartitionPruningFactInRightRule(RelRule.Config config) {
- super(config);
- }
-
- /** Config. */
- public interface Config extends RelRule.Config {
- Config DEFAULT =
- EMPTY.withOperandSupplier(
- b0 ->
- b0.operand(BatchPhysicalJoinBase.class)
- .inputs(
- l ->
- l.operand(
- BatchPhysicalRel
- .class)
- .anyInputs(),
- r ->
- r.operand(
- BatchPhysicalTableSourceScan
- .class)
- .noInputs()))
- .as(DynamicPartitionPruningFactInRightRule.Config.class);
-
- @Override
- default DynamicPartitionPruningFactInRightRule toRule() {
- return new DynamicPartitionPruningFactInRightRule(this);
- }
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- final BatchPhysicalRel dimSide = call.rel(1);
- final BatchPhysicalTableSourceScan factScan = call.rel(2);
-
- final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
- createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, false);
- final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newFactScan));
- call.transformTo(newJoin);
- }
- }
-
- /** Simple dynamic filtering pattern with fact side in join left. */
- protected static class DynamicPartitionPruningFactInLeftRule
- extends DynamicPartitionPruningRule {
-
- public DynamicPartitionPruningFactInLeftRule(RelRule.Config config) {
- super(config);
- }
-
- /** Config. */
- public interface Config extends RelRule.Config {
- @Override
- default DynamicPartitionPruningFactInLeftRule toRule() {
- return new DynamicPartitionPruningFactInLeftRule(this);
- }
-
- Config DEFAULT =
- EMPTY.withOperandSupplier(
- b0 ->
- b0.operand(BatchPhysicalJoinBase.class)
- .inputs(
- l ->
- l.operand(
- BatchPhysicalTableSourceScan
- .class)
- .noInputs(),
- r ->
- r.operand(
- BatchPhysicalRel
- .class)
- .anyInputs()))
- .as(DynamicPartitionPruningFactInLeftRule.Config.class);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- final BatchPhysicalTableSourceScan factScan = call.rel(1);
- final BatchPhysicalRel dimSide = call.rel(2);
-
- final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
- createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, true);
- final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newFactScan, dimSide));
- call.transformTo(newJoin);
- }
- }
-
- /** Dynamic filtering pattern with exchange node in fact side while fact side in join right. */
- protected static class DynamicPartitionPruningFactInRightWithExchangeRule
- extends DynamicPartitionPruningRule {
-
- public DynamicPartitionPruningFactInRightWithExchangeRule(RelRule.Config config) {
- super(config);
- }
-
- /** Config. */
- public interface Config extends RelRule.Config {
- @Override
- default DynamicPartitionPruningFactInRightWithExchangeRule toRule() {
- return new DynamicPartitionPruningFactInRightWithExchangeRule(this);
- }
-
- Config DEFAULT =
- EMPTY.withOperandSupplier(
- b0 ->
- b0.operand(BatchPhysicalJoinBase.class)
- .inputs(
- l ->
- l.operand(
- BatchPhysicalRel
- .class)
- .anyInputs(),
- r ->
- r.operand(
- BatchPhysicalExchange
- .class)
- .oneInput(
- e ->
- e.operand(
- BatchPhysicalTableSourceScan
- .class)
- .noInputs())))
- .as(DynamicPartitionPruningFactInRightWithExchangeRule.Config.class);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- final BatchPhysicalRel dimSide = call.rel(1);
- final BatchPhysicalExchange exchange = call.rel(2);
- final BatchPhysicalTableSourceScan factScan = call.rel(3);
-
- final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
- createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, false);
- final BatchPhysicalExchange newExchange =
- (BatchPhysicalExchange)
- exchange.copy(
- exchange.getTraitSet(), Collections.singletonList(newFactScan));
- final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newExchange));
- call.transformTo(newJoin);
- }
- }
-
- /** Dynamic filtering pattern with exchange node in fact side while fact side in join left. */
- protected static class DynamicPartitionPruningFactInLeftWithExchangeRule
- extends DynamicPartitionPruningRule {
-
- public DynamicPartitionPruningFactInLeftWithExchangeRule(RelRule.Config config) {
- super(config);
- }
-
- /** Config. */
- public interface Config extends RelRule.Config {
- @Override
- default DynamicPartitionPruningFactInLeftWithExchangeRule toRule() {
- return new DynamicPartitionPruningFactInLeftWithExchangeRule(this);
- }
-
- Config DEFAULT =
- EMPTY.withOperandSupplier(
- b0 ->
- b0.operand(BatchPhysicalJoinBase.class)
- .inputs(
- l ->
- l.operand(
- BatchPhysicalExchange
- .class)
- .oneInput(
- e ->
- e.operand(
- BatchPhysicalTableSourceScan
- .class)
- .noInputs()),
- r ->
- r.operand(
- BatchPhysicalRel
- .class)
- .anyInputs()))
- .as(DynamicPartitionPruningFactInLeftWithExchangeRule.Config.class);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- final BatchPhysicalExchange exchange = call.rel(1);
- final BatchPhysicalTableSourceScan factScan = call.rel(2);
- final BatchPhysicalRel dimSide = call.rel(3);
-
- final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
- createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, true);
- final BatchPhysicalExchange newExchange =
- (BatchPhysicalExchange)
- exchange.copy(
- exchange.getTraitSet(), Collections.singletonList(newFactScan));
- final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newExchange, dimSide));
- call.transformTo(newJoin);
- }
- }
-
- /** Dynamic filtering pattern with calc node in fact side while fact side in join right. */
- protected static class DynamicPartitionPruningFactInRightWithCalcRule
- extends DynamicPartitionPruningRule {
-
- public DynamicPartitionPruningFactInRightWithCalcRule(RelRule.Config config) {
- super(config);
- }
-
- /** Config. */
- public interface Config extends RelRule.Config {
- @Override
- default DynamicPartitionPruningFactInRightWithCalcRule toRule() {
- return new DynamicPartitionPruningFactInRightWithCalcRule(this);
- }
-
- Config DEFAULT =
- EMPTY.withOperandSupplier(
- b0 ->
- b0.operand(BatchPhysicalJoinBase.class)
- .inputs(
- l ->
- l.operand(
- BatchPhysicalRel
- .class)
- .anyInputs(),
- r ->
- r.operand(
- BatchPhysicalCalc
- .class)
- .oneInput(
- f ->
- f.operand(
- BatchPhysicalTableSourceScan
- .class)
- .noInputs())))
- .as(DynamicPartitionPruningFactInRightWithCalcRule.Config.class);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- final BatchPhysicalRel dimSide = call.rel(1);
- final BatchPhysicalCalc factCalc = call.rel(2);
- final BatchPhysicalTableSourceScan factScan = call.rel(3);
-
- final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
- createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, false);
- final BatchPhysicalCalc newCalc =
- (BatchPhysicalCalc)
- factCalc.copy(
- factCalc.getTraitSet(), newFactScan, factCalc.getProgram());
- final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newCalc));
- call.transformTo(newJoin);
- }
- }
-
- /** Dynamic filtering pattern with calc node in fact side while fact side in join left. */
- protected static class DynamicPartitionPruningFactInLeftWithCalcRule
- extends DynamicPartitionPruningRule {
-
- public DynamicPartitionPruningFactInLeftWithCalcRule(RelRule.Config config) {
- super(config);
- }
-
- /** Config. */
- public interface Config extends RelRule.Config {
- @Override
- default DynamicPartitionPruningFactInLeftWithCalcRule toRule() {
- return new DynamicPartitionPruningFactInLeftWithCalcRule(this);
- }
-
- Config DEFAULT =
- EMPTY.withOperandSupplier(
- b0 ->
- b0.operand(BatchPhysicalJoinBase.class)
- .inputs(
- l ->
- l.operand(
- BatchPhysicalCalc
- .class)
- .oneInput(
- f ->
- f.operand(
- BatchPhysicalTableSourceScan
- .class)
- .noInputs()),
- r ->
- r.operand(
- BatchPhysicalRel
- .class)
- .anyInputs()))
- .as(DynamicPartitionPruningFactInLeftWithCalcRule.Config.class);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- final BatchPhysicalCalc factCalc = call.rel(1);
- final BatchPhysicalTableSourceScan factScan = call.rel(2);
- final BatchPhysicalRel dimSide = call.rel(3);
-
- final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
- createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, true);
- final BatchPhysicalCalc newCalc =
- (BatchPhysicalCalc)
- factCalc.copy(
- factCalc.getTraitSet(), newFactScan, factCalc.getProgram());
- final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newCalc, dimSide));
- call.transformTo(newJoin);
- }
- }
-
- /**
- * Dynamic filtering pattern with exchange node and calc node in fact side while fact side in
- * join right.
- */
- protected static class DynamicPartitionPruningFactInRightWithExchangeAndCalcRule
- extends DynamicPartitionPruningRule {
-
- public DynamicPartitionPruningFactInRightWithExchangeAndCalcRule(RelRule.Config config) {
- super(config);
- }
-
- /** Config. */
- public interface Config extends RelRule.Config {
- @Override
- default DynamicPartitionPruningFactInRightWithExchangeAndCalcRule toRule() {
- return new DynamicPartitionPruningFactInRightWithExchangeAndCalcRule(this);
- }
-
- Config DEFAULT =
- EMPTY.withOperandSupplier(
- b0 ->
- b0.operand(BatchPhysicalJoinBase.class)
- .inputs(
- l ->
- l.operand(
- BatchPhysicalRel
- .class)
- .anyInputs(),
- r ->
- r.operand(
- BatchPhysicalExchange
- .class)
- .oneInput(
- e ->
- e.operand(
- BatchPhysicalCalc
- .class)
- .oneInput(
- f ->
- f.operand(
- BatchPhysicalTableSourceScan
- .class)
- .noInputs()))))
- .as(
- DynamicPartitionPruningFactInRightWithExchangeAndCalcRule.Config
- .class);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- final BatchPhysicalRel dimSide = call.rel(1);
- final BatchPhysicalExchange exchange = call.rel(2);
- final BatchPhysicalCalc factCalc = call.rel(3);
- final BatchPhysicalTableSourceScan factScan = call.rel(4);
-
- final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
- createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, false);
- final BatchPhysicalCalc newCalc =
- (BatchPhysicalCalc)
- factCalc.copy(
- factCalc.getTraitSet(), newFactScan, factCalc.getProgram());
- final BatchPhysicalExchange newExchange =
- (BatchPhysicalExchange)
- exchange.copy(
- exchange.getTraitSet(), Collections.singletonList(newCalc));
- final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newExchange));
- call.transformTo(newJoin);
- }
- }
-
- /**
- * Dynamic filtering pattern with exchange node and calc node in fact side while fact side in
- * join left.
- */
- protected static class DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule
- extends DynamicPartitionPruningRule {
-
- public DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule(RelRule.Config config) {
- super(config);
- }
-
- /** Config. */
- public interface Config extends RelRule.Config {
- @Override
- default DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule toRule() {
- return new DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule(this);
- }
-
- Config DEFAULT =
- EMPTY.withOperandSupplier(
- b0 ->
- b0.operand(BatchPhysicalJoinBase.class)
- .inputs(
- l ->
- l.operand(
- BatchPhysicalExchange
- .class)
- .oneInput(
- e ->
- e.operand(
- BatchPhysicalCalc
- .class)
- .oneInput(
- f ->
- f.operand(
- BatchPhysicalTableSourceScan
- .class)
- .noInputs())),
- r ->
- r.operand(
- BatchPhysicalRel
- .class)
- .anyInputs()))
- .as(
- DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule.Config
- .class);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final BatchPhysicalJoinBase join = call.rel(0);
- final BatchPhysicalExchange exchange = call.rel(1);
- final BatchPhysicalCalc factCalc = call.rel(2);
- final BatchPhysicalTableSourceScan factScan = call.rel(3);
- final BatchPhysicalRel dimSide = call.rel(4);
-
- final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
- createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, true);
- final BatchPhysicalCalc newCalc =
- (BatchPhysicalCalc)
- factCalc.copy(
- factCalc.getTraitSet(), newFactScan, factCalc.getProgram());
- final BatchPhysicalExchange newExchange =
- (BatchPhysicalExchange)
- exchange.copy(
- exchange.getTraitSet(), Collections.singletonList(newCalc));
- final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newExchange, dimSide));
- call.transformTo(newJoin);
- }
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
index 9f2649cdf8b..90f7b40bc0b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
@@ -19,18 +19,23 @@
package org.apache.flink.table.planner.utils;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
+import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringDataCollector;
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
@@ -45,6 +50,8 @@ import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
@@ -52,6 +59,8 @@ import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.util.ImmutableIntList;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -60,318 +69,470 @@ import java.util.stream.Collectors;
public class DynamicPartitionPruningUtils {
/**
- * For the input join node, judge whether the join left side and join right side meet the
- * requirements of dynamic partition pruning. Fact side in left or right join is not clear.
+ * Judge whether the input RelNode meets the conditions of dimSide. If joinKeys is null means we
+ * need not consider the join keys in dim side, which already deal by dynamic partition pruning
+ * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if
+ * changed, this RelNode is not dim side.
*/
- public static boolean supportDynamicPartitionPruning(Join join) {
- return supportDynamicPartitionPruning(join, true)
- || supportDynamicPartitionPruning(join, false);
+ public static boolean isDppDimSide(RelNode rel) {
+ DppDimSideChecker dimSideChecker = new DppDimSideChecker(rel);
+ return dimSideChecker.isDppDimSide();
}
/**
- * For the input join node, judge whether the join left side and join right side meet the
- * requirements of dynamic partition pruning. Fact side in left or right is clear. If meets the
- * requirements, return true.
+ * Judge whether the input RelNode can be converted to the dpp fact side. If the input RelNode
+ * can be converted, this method will return the converted fact side whose partitioned table
+ * source will be converted to {@link BatchPhysicalDynamicFilteringTableSourceScan}, If not,
+ * this method will return the origin RelNode.
*/
- public static boolean supportDynamicPartitionPruning(Join join, boolean factInLeft) {
- if (!ShortcutUtils.unwrapContext(join)
- .getTableConfig()
- .get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
- return false;
- }
- // Now dynamic partition pruning supports left/right join, inner and semi join. but now semi
- // join can not join reorder.
- if (join.getJoinType() == JoinRelType.LEFT) {
- if (factInLeft) {
- return false;
- }
- } else if (join.getJoinType() == JoinRelType.RIGHT) {
- if (!factInLeft) {
- return false;
- }
- } else if (join.getJoinType() != JoinRelType.INNER
- && join.getJoinType() != JoinRelType.SEMI) {
+ public static Tuple2<Boolean, RelNode> canConvertAndConvertDppFactSide(
+ RelNode rel,
+ ImmutableIntList joinKeys,
+ RelNode dimSide,
+ ImmutableIntList dimSideJoinKey) {
+ DppFactSideChecker dppFactSideChecker =
+ new DppFactSideChecker(rel, joinKeys, dimSide, dimSideJoinKey);
+ return dppFactSideChecker.canConvertAndConvertDppFactSide();
+ }
+
+ /** Judge whether the join node is suitable one for dpp pattern. */
+ public static boolean isSuitableJoin(Join join) {
+ // Now dynamic partition pruning supports left/right join, inner and semi
+ // join. but now semi join can not join reorder.
+ if (join.getJoinType() != JoinRelType.INNER
+ && join.getJoinType() != JoinRelType.SEMI
+ && join.getJoinType() != JoinRelType.LEFT
+ && join.getJoinType() != JoinRelType.RIGHT) {
return false;
}
JoinInfo joinInfo = join.analyzeCondition();
- if (joinInfo.leftKeys.isEmpty()) {
- return false;
- }
- RelNode left = join.getLeft();
- RelNode right = join.getRight();
-
- // TODO Now fact side and dim side don't support many complex patterns, like join inside
- // fact/dim side, agg inside fact/dim side etc. which will support next.
- return factInLeft
- ? isDynamicPartitionPruningPattern(left, right, joinInfo.leftKeys)
- : isDynamicPartitionPruningPattern(right, left, joinInfo.rightKeys);
+ return !joinInfo.leftKeys.isEmpty();
}
- private static boolean isDynamicPartitionPruningPattern(
- RelNode factSide, RelNode dimSide, ImmutableIntList factSideJoinKey) {
- return isDimSide(dimSide) && isFactSide(factSide, factSideJoinKey);
- }
+ /** This class is used to check whether the relNode is dpp dim side. */
+ private static class DppDimSideChecker {
+ private final RelNode relNode;
+ private boolean hasFilter;
+ private boolean hasPartitionedScan;
+ private final List<ContextResolvedTable> tables = new ArrayList<>();
- /** make a dpp fact side factor to recurrence in fact side. */
- private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
- DppFactSideFactors factSideFactors = new DppFactSideFactors();
- visitFactSide(rel, factSideFactors, joinKeys);
- return factSideFactors.isFactSide();
- }
+ public DppDimSideChecker(RelNode relNode) {
+ this.relNode = relNode;
+ }
- /**
- * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we
- * need not consider the join keys in dim side, which already deal by dynamic partition pruning
- * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if
- * changed, this RelNode is not dim side.
- */
- private static boolean isDimSide(RelNode rel) {
- DppDimSideFactors dimSideFactors = new DppDimSideFactors();
- visitDimSide(rel, dimSideFactors);
- return dimSideFactors.isDimSide();
- }
+ public boolean isDppDimSide() {
+ visitDimSide(this.relNode);
+ return hasFilter && !hasPartitionedScan && tables.size() == 1;
+ }
- /**
- * Visit fact side to judge whether fact side has partition table, partition table source meets
- * the condition of dpp table source and dynamic filtering keys changed in fact side.
- */
- private static void visitFactSide(
- RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList joinKeys) {
- if (rel instanceof TableScan) {
- TableScan scan = (TableScan) rel;
- if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {
- // rule applied
- factSideFactors.isSuitableFactScanSource = false;
- return;
- }
- TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
- if (tableSourceTable == null) {
- factSideFactors.isSuitableFactScanSource = false;
- return;
- }
- CatalogTable catalogTable = tableSourceTable.contextResolvedTable().getResolvedTable();
- List<String> partitionKeys = catalogTable.getPartitionKeys();
- if (partitionKeys.isEmpty()) {
- factSideFactors.isSuitableFactScanSource = false;
- return;
- }
- DynamicTableSource tableSource = tableSourceTable.tableSource();
- if (!(tableSource instanceof SupportsDynamicFiltering)
- || !(tableSource instanceof ScanTableSource)) {
- factSideFactors.isSuitableFactScanSource = false;
- return;
- }
- if (!isNewSource((ScanTableSource) tableSource)) {
- factSideFactors.isSuitableFactScanSource = false;
- return;
- }
+ /**
+ * Visit dim side to judge whether dim side has filter condition and whether dim side's
+ * source table scan is non partitioned scan.
+ */
+ private void visitDimSide(RelNode rel) {
+ // TODO Let visitDimSide more efficient and more accurate. Like a filter on dim table or
+ // a filter for the partition field on fact table.
+ if (rel instanceof TableScan) {
+ TableScan scan = (TableScan) rel;
+ TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+ if (table == null) {
+ return;
+ }
+ if (!hasFilter
+ && table.abilitySpecs() != null
+ && table.abilitySpecs().length != 0) {
+ for (SourceAbilitySpec spec : table.abilitySpecs()) {
+ if (spec instanceof FilterPushDownSpec) {
+ List<RexNode> predicates = ((FilterPushDownSpec) spec).getPredicates();
+ for (RexNode predicate : predicates) {
+ if (isSuitableFilter(predicate)) {
+ hasFilter = true;
+ }
+ }
+ }
+ }
+ }
+ CatalogTable catalogTable = table.contextResolvedTable().getResolvedTable();
+ if (catalogTable.isPartitioned()) {
+ hasPartitionedScan = true;
+ return;
+ }
- List<String> candidateFields =
- joinKeys.stream()
- .map(i -> scan.getRowType().getFieldNames().get(i))
- .collect(Collectors.toList());
- if (candidateFields.isEmpty()) {
- factSideFactors.isSuitableFactScanSource = false;
- return;
+ // To ensure there is only one source on the dim side.
+ setTables(table.contextResolvedTable());
+ } else if (rel instanceof HepRelVertex) {
+ visitDimSide(((HepRelVertex) rel).getCurrentRel());
+ } else if (rel instanceof Exchange || rel instanceof Project) {
+ visitDimSide(rel.getInput(0));
+ } else if (rel instanceof Calc) {
+ RexProgram origProgram = ((Calc) rel).getProgram();
+ if (origProgram.getCondition() != null
+ && isSuitableFilter(
+ origProgram.expandLocalRef(origProgram.getCondition()))) {
+ hasFilter = true;
+ }
+ visitDimSide(rel.getInput(0));
+ } else if (rel instanceof Filter) {
+ if (isSuitableFilter(((Filter) rel).getCondition())) {
+ hasFilter = true;
+ }
+ visitDimSide(rel.getInput(0));
+ } else if (rel instanceof Join) {
+ Join join = (Join) rel;
+ visitDimSide(join.getLeft());
+ visitDimSide(join.getRight());
+ } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+ visitDimSide(((BatchPhysicalGroupAggregateBase) rel).getInput());
+ } else if (rel instanceof Union) {
+ Union union = (Union) rel;
+ for (RelNode input : union.getInputs()) {
+ visitDimSide(input);
+ }
}
+ }
- factSideFactors.isSuitableFactScanSource =
- !getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields)
- .isEmpty();
- } else if (rel instanceof HepRelVertex) {
- visitFactSide(((HepRelVertex) rel).getCurrentRel(), factSideFactors, joinKeys);
- } else if (rel instanceof Exchange || rel instanceof Filter) {
- visitFactSide(rel.getInput(0), factSideFactors, joinKeys);
- } else if (rel instanceof Project) {
- List<RexNode> projects = ((Project) rel).getProjects();
- ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
- if (inputJoinKeys.isEmpty()) {
- factSideFactors.isSuitableJoinKey = false;
- return;
+ /**
+ * Not all filter condition suitable for using to filter partitions by dynamic partition
+ * pruning rules. For example, NOT NULL can only filter one default partition which have a
+ * small impact on filtering data.
+ */
+ private static boolean isSuitableFilter(RexNode filterCondition) {
+ switch (filterCondition.getKind()) {
+ case AND:
+ List<RexNode> conjunctions = RelOptUtil.conjunctions(filterCondition);
+ return isSuitableFilter(conjunctions.get(0))
+ || isSuitableFilter(conjunctions.get(1));
+ case OR:
+ List<RexNode> disjunctions = RelOptUtil.disjunctions(filterCondition);
+ return isSuitableFilter(disjunctions.get(0))
+ && isSuitableFilter(disjunctions.get(1));
+ case NOT:
+ return isSuitableFilter(((RexCall) filterCondition).operands.get(0));
+ case EQUALS:
+ case GREATER_THAN:
+ case GREATER_THAN_OR_EQUAL:
+ case LESS_THAN:
+ case LESS_THAN_OR_EQUAL:
+ case NOT_EQUALS:
+ case IN:
+ case LIKE:
+ case CONTAINS:
+ case SEARCH:
+ case IS_FALSE:
+ case IS_NOT_FALSE:
+ case IS_NOT_TRUE:
+ case IS_TRUE:
+ // TODO adding more suitable filters which can filter enough partitions after
+ // using this filter in dynamic partition pruning.
+ return true;
+ default:
+ return false;
}
+ }
- visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
- } else if (rel instanceof Calc) {
- Calc calc = (Calc) rel;
- RexProgram program = calc.getProgram();
- List<RexNode> projects =
- program.getProjectList().stream()
- .map(program::expandLocalRef)
- .collect(Collectors.toList());
- ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
- if (inputJoinKeys.isEmpty()) {
- factSideFactors.isSuitableJoinKey = false;
- return;
+ private void setTables(ContextResolvedTable catalogTable) {
+ if (tables.size() == 0) {
+ tables.add(catalogTable);
+ } else {
+ for (ContextResolvedTable thisTable : new ArrayList<>(tables)) {
+ if (!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) {
+ tables.add(catalogTable);
+ }
+ }
}
-
- visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
}
}
- public static List<String> getSuitableDynamicFilteringFieldsInFactSide(
- DynamicTableSource tableSource, List<String> candidateFields) {
- List<String> acceptedFilterFields =
- ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields();
- if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
- return new ArrayList<>();
+ /** This class is used to check whether the relNode can be as a fact side in dpp. */
+ private static class DppFactSideChecker {
+ private final RelNode relNode;
+ private final ImmutableIntList joinKeys;
+ private final RelNode dimSide;
+ private final ImmutableIntList dimSideJoinKey;
+
+ // If join key is not changed in fact side, this value is always true.
+ private boolean isChanged;
+
+ public DppFactSideChecker(
+ RelNode relNode,
+ ImmutableIntList joinKeys,
+ RelNode dimSide,
+ ImmutableIntList dimSideJoinKey) {
+ this.relNode = relNode;
+ this.joinKeys = joinKeys;
+ this.dimSide = dimSide;
+ this.dimSideJoinKey = dimSideJoinKey;
}
- List<String> suitableFields = new ArrayList<>();
- // If candidateField not in acceptedFilterFields means dpp rule will not be matched,
- // because we can not prune any partitions according to non-accepted filter fields
- // provided by partition table source.
- for (String candidateField : candidateFields) {
- if (acceptedFilterFields.contains(candidateField)) {
- suitableFields.add(candidateField);
- }
+ public Tuple2<Boolean, RelNode> canConvertAndConvertDppFactSide() {
+ return Tuple2.of(
+ isChanged, convertDppFactSide(relNode, joinKeys, dimSide, dimSideJoinKey));
}
- return suitableFields;
- }
+ private RelNode convertDppFactSide(
+ RelNode rel,
+ ImmutableIntList joinKeys,
+ RelNode dimSide,
+ ImmutableIntList dimSideJoinKey) {
+ if (rel instanceof TableScan) {
+ TableScan scan = (TableScan) rel;
+ if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {
+ // rule applied
+ return rel;
+ }
+ TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable == null) {
+ return rel;
+ }
+ CatalogTable catalogTable =
+ tableSourceTable.contextResolvedTable().getResolvedTable();
+ List<String> partitionKeys = catalogTable.getPartitionKeys();
+ if (partitionKeys.isEmpty()) {
+ return rel;
+ }
+ DynamicTableSource tableSource = tableSourceTable.tableSource();
+ if (!(tableSource instanceof SupportsDynamicFiltering)
+ || !(tableSource instanceof ScanTableSource)) {
+ return rel;
+ }
- /**
- * Visit dim side to judge whether dim side has filter condition and whether dim side's source
- * table scan is non partitioned scan.
- */
- private static void visitDimSide(RelNode rel, DppDimSideFactors dimSideFactors) {
- // TODO Let visitDimSide more efficient and more accurate. Like a filter on dim table or a
- // filter for the partition field on fact table.
- if (rel instanceof TableScan) {
- TableScan scan = (TableScan) rel;
- TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
- if (table == null) {
- return;
- }
- if (!dimSideFactors.hasFilter
- && table.abilitySpecs() != null
- && table.abilitySpecs().length != 0) {
- for (SourceAbilitySpec spec : table.abilitySpecs()) {
- if (spec instanceof FilterPushDownSpec) {
- List<RexNode> predicates = ((FilterPushDownSpec) spec).getPredicates();
- for (RexNode predicate : predicates) {
- if (isSuitableFilter(predicate)) {
- dimSideFactors.hasFilter = true;
- }
- }
+ // Dpp cannot success if source have aggregate push down spec.
+ if (Arrays.stream(tableSourceTable.abilitySpecs())
+ .anyMatch(spec -> spec instanceof AggregatePushDownSpec)) {
+ return rel;
+ }
+
+ if (!isNewSource((ScanTableSource) tableSource)) {
+ return rel;
+ }
+
+ List<String> candidateFields =
+ joinKeys.stream()
+ .map(i -> scan.getRowType().getFieldNames().get(i))
+ .collect(Collectors.toList());
+ if (candidateFields.isEmpty()) {
+ return rel;
+ }
+
+ List<String> acceptedFilterFields =
+ getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields);
+
+ if (acceptedFilterFields.size() == 0) {
+ return rel;
+ }
+
+ // Apply suitable accepted filter fields to source.
+ ((SupportsDynamicFiltering) tableSource)
+ .applyDynamicFiltering(acceptedFilterFields);
+
+ List<Integer> acceptedFieldIndices =
+ acceptedFilterFields.stream()
+ .map(f -> scan.getRowType().getFieldNames().indexOf(f))
+ .collect(Collectors.toList());
+ List<Integer> dynamicFilteringFieldIndices = new ArrayList<>();
+ for (int i = 0; i < joinKeys.size(); ++i) {
+ if (acceptedFieldIndices.contains(joinKeys.get(i))) {
+ dynamicFilteringFieldIndices.add(dimSideJoinKey.get(i));
}
}
+
+ BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector =
+ createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices);
+
+ isChanged = true;
+ return new BatchPhysicalDynamicFilteringTableSourceScan(
+ scan.getCluster(),
+ scan.getTraitSet(),
+ scan.getHints(),
+ tableSourceTable,
+ dynamicFilteringDataCollector);
+ } else if (rel instanceof Exchange || rel instanceof Filter) {
+ return rel.copy(
+ rel.getTraitSet(),
+ Collections.singletonList(
+ convertDppFactSide(
+ rel.getInput(0), joinKeys, dimSide, dimSideJoinKey)));
+ } else if (rel instanceof Project) {
+ List<RexNode> projects = ((Project) rel).getProjects();
+ ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
+ if (inputJoinKeys.isEmpty()) {
+ return rel;
+ }
+
+ return rel.copy(
+ rel.getTraitSet(),
+ Collections.singletonList(
+ convertDppFactSide(
+ rel.getInput(0), inputJoinKeys, dimSide, dimSideJoinKey)));
+ } else if (rel instanceof Calc) {
+ Calc calc = (Calc) rel;
+ RexProgram program = calc.getProgram();
+ List<RexNode> projects =
+ program.getProjectList().stream()
+ .map(program::expandLocalRef)
+ .collect(Collectors.toList());
+ ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
+ if (inputJoinKeys.isEmpty()) {
+ return rel;
+ }
+
+ return rel.copy(
+ rel.getTraitSet(),
+ Collections.singletonList(
+ convertDppFactSide(
+ rel.getInput(0), inputJoinKeys, dimSide, dimSideJoinKey)));
+ } else if (rel instanceof Join) {
+ Join currentJoin = (Join) rel;
+ return currentJoin.copy(
+ currentJoin.getTraitSet(),
+ Arrays.asList(
+ convertDppFactSide(
+ currentJoin.getLeft(),
+ getInputIndices(currentJoin, joinKeys, true),
+ dimSide,
+ dimSideJoinKey),
+ convertDppFactSide(
+ currentJoin.getRight(),
+ getInputIndices(currentJoin, joinKeys, false),
+ dimSide,
+ dimSideJoinKey)));
+ } else if (rel instanceof Union) {
+ Union union = (Union) rel;
+ List<RelNode> newInputs = new ArrayList<>();
+ for (RelNode input : union.getInputs()) {
+ newInputs.add(convertDppFactSide(input, joinKeys, dimSide, dimSideJoinKey));
+ }
+ return union.copy(union.getTraitSet(), newInputs, union.all);
+ } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+ BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel;
+ RelNode input = agg.getInput();
+ int[] grouping = agg.grouping();
+
+ // If one of joinKey in joinKeys are aggregate function field, dpp will not success.
+ for (int k : joinKeys) {
+ if (k >= grouping.length) {
+ return rel;
+ }
+ }
+
+ RelNode convertedRel =
+ convertDppFactSide(
+ input,
+ ImmutableIntList.copyOf(
+ joinKeys.stream()
+ .map(joinKey -> agg.grouping()[joinKey])
+ .collect(Collectors.toList())),
+ dimSide,
+ dimSideJoinKey);
+ return agg.copy(agg.getTraitSet(), Collections.singletonList(convertedRel));
+ } else {
+ // TODO In the future, we need to support more operators to enrich matchable dpp
+ // pattern.
}
- CatalogTable catalogTable = table.contextResolvedTable().getResolvedTable();
- dimSideFactors.hasNonPartitionedScan = !catalogTable.isPartitioned();
- } else if (rel instanceof HepRelVertex) {
- visitDimSide(((HepRelVertex) rel).getCurrentRel(), dimSideFactors);
- } else if (rel instanceof Exchange || rel instanceof Project) {
- visitDimSide(rel.getInput(0), dimSideFactors);
- } else if (rel instanceof Calc) {
- RexProgram origProgram = ((Calc) rel).getProgram();
- if (origProgram.getCondition() != null
- && isSuitableFilter(origProgram.expandLocalRef(origProgram.getCondition()))) {
- dimSideFactors.hasFilter = true;
+
+ return rel;
+ }
+
+ private static List<String> getSuitableDynamicFilteringFieldsInFactSide(
+ DynamicTableSource tableSource, List<String> candidateFields) {
+ List<String> acceptedFilterFields =
+ ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields();
+ if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
+ return new ArrayList<>();
}
- visitDimSide(rel.getInput(0), dimSideFactors);
- } else if (rel instanceof Filter) {
- if (isSuitableFilter(((Filter) rel).getCondition())) {
- dimSideFactors.hasFilter = true;
+
+ List<String> suitableFields = new ArrayList<>();
+ // If candidateField not in acceptedFilterFields means dpp rule will not be matched,
+ // because we can not prune any partitions according to non-accepted filter fields
+ // provided by partition table source.
+ for (String candidateField : candidateFields) {
+ if (acceptedFilterFields.contains(candidateField)) {
+ suitableFields.add(candidateField);
+ }
}
- visitDimSide(rel.getInput(0), dimSideFactors);
- }
- }
- /**
- * Not all filter condition suitable for using to filter partitions by dynamic partition pruning
- * rules. For example, NOT NULL can only filter one default partition which have a small impact
- * on filtering data.
- */
- private static boolean isSuitableFilter(RexNode filterCondition) {
- switch (filterCondition.getKind()) {
- case AND:
- List<RexNode> conjunctions = RelOptUtil.conjunctions(filterCondition);
- return isSuitableFilter(conjunctions.get(0))
- || isSuitableFilter(conjunctions.get(1));
- case OR:
- List<RexNode> disjunctions = RelOptUtil.disjunctions(filterCondition);
- return isSuitableFilter(disjunctions.get(0))
- && isSuitableFilter(disjunctions.get(1));
- case NOT:
- return isSuitableFilter(((RexCall) filterCondition).operands.get(0));
- case EQUALS:
- case GREATER_THAN:
- case GREATER_THAN_OR_EQUAL:
- case LESS_THAN:
- case LESS_THAN_OR_EQUAL:
- case NOT_EQUALS:
- case IN:
- case LIKE:
- case CONTAINS:
- case SEARCH:
- case IS_FALSE:
- case IS_NOT_FALSE:
- case IS_NOT_TRUE:
- case IS_TRUE:
- // TODO adding more suitable filters which can filter enough partitions after using
- // this filter in dynamic partition pruning.
- return true;
- default:
- return false;
+ return suitableFields;
}
- }
- /** Returns true if the source is FLIP-27 source, else false. */
- private static boolean isNewSource(ScanTableSource scanTableSource) {
- ScanTableSource.ScanRuntimeProvider provider =
- scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
- if (provider instanceof SourceProvider) {
- return true;
- } else if (provider instanceof TransformationScanProvider) {
- Transformation<?> transformation =
- ((TransformationScanProvider) provider)
- .createTransformation(name -> Optional.empty());
- return transformation instanceof SourceTransformation;
- } else if (provider instanceof DataStreamScanProvider) {
- // Suppose DataStreamScanProvider of sources that support dynamic filtering will use new
- // Source. It's not reliable and should be checked.
- // TODO FLINK-28864 check if the source used by the DataStreamScanProvider is actually a
- // new source.
- // This situation will not generate wrong result because it's handled when translating
- // BatchTableSourceScan. The only effect is the physical plan and the exec node plan
- // have DPP nodes, but they do not work in runtime.
- return true;
+ private static BatchPhysicalDynamicFilteringDataCollector createDynamicFilteringConnector(
+ RelNode dimSide, List<Integer> dynamicFilteringFieldIndices) {
+ final RelDataType outputType =
+ ((FlinkTypeFactory) dimSide.getCluster().getTypeFactory())
+ .projectStructType(
+ dimSide.getRowType(),
+ dynamicFilteringFieldIndices.stream()
+ .mapToInt(i -> i)
+ .toArray());
+ return new BatchPhysicalDynamicFilteringDataCollector(
+ dimSide.getCluster(),
+ dimSide.getTraitSet(),
+ ignoreExchange(dimSide),
+ outputType,
+ dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray());
}
- // TODO supports more
- return false;
- }
- private static ImmutableIntList getInputIndices(
- List<RexNode> projects, ImmutableIntList joinKeys) {
- List<Integer> indices = new ArrayList<>();
- for (int k : joinKeys) {
- RexNode rexNode = projects.get(k);
- if (rexNode instanceof RexInputRef) {
- indices.add(((RexInputRef) rexNode).getIndex());
+ private static RelNode ignoreExchange(RelNode dimSide) {
+ if (dimSide instanceof Exchange) {
+ return dimSide.getInput(0);
+ } else {
+ return dimSide;
}
}
- return ImmutableIntList.copyOf(indices);
- }
-
- /** This class is used to remember dim side messages while recurring in dim side. */
- private static class DppDimSideFactors {
- private boolean hasFilter;
- private boolean hasNonPartitionedScan;
- public boolean isDimSide() {
- return hasFilter && hasNonPartitionedScan;
+ /** Returns true if the source is FLIP-27 source, else false. */
+ private static boolean isNewSource(ScanTableSource scanTableSource) {
+ ScanTableSource.ScanRuntimeProvider provider =
+ scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ if (provider instanceof SourceProvider) {
+ return true;
+ } else if (provider instanceof TransformationScanProvider) {
+ Transformation<?> transformation =
+ ((TransformationScanProvider) provider)
+ .createTransformation(name -> Optional.empty());
+ return transformation instanceof SourceTransformation;
+ } else if (provider instanceof DataStreamScanProvider) {
+ // Suppose DataStreamScanProvider of sources that support dynamic filtering will use
+ // new Source. It's not reliable and should be checked.
+ // TODO FLINK-28864 check if the source used by the DataStreamScanProvider is
+ // actually a new source. This situation will not generate wrong result because it's
+ // handled when translating BatchTableSourceScan. The only effect is the physical
+ // plan and the exec node plan have DPP nodes, but they do not work in runtime.
+ return true;
+ }
+ // TODO supports more
+ return false;
}
- }
- /** This class is used to remember fact side messages while recurring in fact side. */
- private static class DppFactSideFactors {
- private boolean isSuitableFactScanSource;
- // If join key is not changed in fact side, this value is always true.
- private boolean isSuitableJoinKey = true;
+ private static ImmutableIntList getInputIndices(
+ List<RexNode> projects, ImmutableIntList joinKeys) {
+ List<Integer> indices = new ArrayList<>();
+ for (int k : joinKeys) {
+ RexNode rexNode = projects.get(k);
+ if (rexNode instanceof RexInputRef) {
+ indices.add(((RexInputRef) rexNode).getIndex());
+ }
+ }
+ return ImmutableIntList.copyOf(indices);
+ }
- public boolean isFactSide() {
- return isSuitableFactScanSource && isSuitableJoinKey;
+ private static ImmutableIntList getInputIndices(
+ Join join, ImmutableIntList joinKeys, boolean isLeft) {
+ List<Integer> indices = new ArrayList<>();
+ RelNode left = join.getLeft();
+ int leftSize = left.getRowType().getFieldCount();
+ for (int k : joinKeys) {
+ if (isLeft) {
+ if (k < leftSize) {
+ indices.add(k);
+ }
+ } else {
+ if (k >= leftSize) {
+ indices.add(k - leftSize);
+ }
+ }
+ }
+ return ImmutableIntList.copyOf(indices);
}
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
index b20868b1835..f5dec96af68 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch._
import org.apache.flink.table.planner.plan.stats.ValueInterval
import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, SortUtil}
import org.apache.flink.table.planner.plan.utils.AggregateUtil.{hasTimeIntervalType, toLong}
-import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.calcite.adapter.enumerable.EnumerableLimit
@@ -336,22 +335,11 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun
fmq.getSelectivity(joinWithOnlyEquiPred, nonEquiPred)
}
- // Currently, join-reorder is before dynamic partition pruning rewrite. This factor
- // is adding to adjust join cost for these join node which meets dynamic partition
- // pruning pattern. Try best to reorder the fact table and fact table together to
- // make DPP succeed.
- val dynamicPartitionPruningFactor =
- if (DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join)) {
- 0.0001
- } else {
- 1
- }
-
if (leftNdv != null && rightNdv != null) {
// selectivity of equi part is 1 / Max(leftNdv, rightNdv)
val selectivityOfEquiPred = Math.min(1d, 1d / Math.max(leftNdv, rightNdv))
return leftRowCount * rightRowCount * selectivityOfEquiPred *
- selectivityOfNonEquiPred * dynamicPartitionPruningFactor
+ selectivityOfNonEquiPred
}
val leftKeysAreUnique = fmq.areColumnsUnique(leftChild, leftKeySet)
@@ -369,14 +357,14 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun
} else {
leftRowCount * selectivityOfNonEquiPred
}
- return outputRowCount * dynamicPartitionPruningFactor
+ return outputRowCount
}
// if joinCondition has no ndv stats and no uniqueKeys stats,
// rowCount = (leftRowCount + rightRowCount) * join condition selectivity
val crossJoin = copyJoinWithNewCondition(join, rexBuilder.makeLiteral(true))
val selectivity = fmq.getSelectivity(crossJoin, condition)
- (leftRowCount + rightRowCount) * selectivity * dynamicPartitionPruningFactor
+ (leftRowCount + rightRowCount) * selectivity
}
private def copyJoinWithNewCondition(join: Join, newCondition: RexNode): Join = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
index f64197c3836..a1784867149 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
@@ -40,6 +40,7 @@ object FlinkBatchProgram {
val TIME_INDICATOR = "time_indicator"
val PHYSICAL = "physical"
val PHYSICAL_REWRITE = "physical_rewrite"
+ val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"
def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[BatchOptimizeContext] = {
val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]()
@@ -288,6 +289,9 @@ object FlinkBatchProgram {
.build()
)
+ // convert dynamic partition pruning scan source
+ chainedProgram.addLast(DYNAMIC_PARTITION_PRUNING, new FlinkDynamicPartitionPruningProgram)
+
chainedProgram
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 39fa34cbaf6..6551d936c58 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -433,18 +433,13 @@ object FlinkBatchRuleSets {
/** RuleSet to optimize plans after batch exec execution. */
val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList(
- (RuleSets
- .ofList(
- EnforceLocalHashAggRule.INSTANCE,
- EnforceLocalSortAggRule.INSTANCE,
- PushLocalHashAggIntoScanRule.INSTANCE,
- PushLocalHashAggWithCalcIntoScanRule.INSTANCE,
- PushLocalSortAggIntoScanRule.INSTANCE,
- PushLocalSortAggWithSortIntoScanRule.INSTANCE,
- PushLocalSortAggWithCalcIntoScanRule.INSTANCE,
- PushLocalSortAggWithSortAndCalcIntoScanRule.INSTANCE
- )
- .asScala ++
- DynamicPartitionPruningRule.DYNAMIC_PARTITION_PRUNING_RULES.asScala).asJava
+ EnforceLocalHashAggRule.INSTANCE,
+ EnforceLocalSortAggRule.INSTANCE,
+ PushLocalHashAggIntoScanRule.INSTANCE,
+ PushLocalHashAggWithCalcIntoScanRule.INSTANCE,
+ PushLocalSortAggIntoScanRule.INSTANCE,
+ PushLocalSortAggWithSortIntoScanRule.INSTANCE,
+ PushLocalSortAggWithCalcIntoScanRule.INSTANCE,
+ PushLocalSortAggWithSortAndCalcIntoScanRule.INSTANCE
)
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java
similarity index 76%
rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java
index 0a7f0c7bc9d..cef12b156aa 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.rules.physical.batch;
+package org.apache.flink.table.planner.plan.optimize.program;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
@@ -40,10 +40,10 @@ import java.util.List;
import java.util.Map;
/**
- * Test for rules that extend {@link DynamicPartitionPruningRule} to create {@link
+ * Tests for rules that extend {@link FlinkDynamicPartitionPruningProgram} to create {@link
* org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan}.
*/
-public class DynamicPartitionPruningRuleTest extends TableTestBase {
+public class DynamicPartitionPruningProgramTest extends TableTestBase {
private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault());
private final TestValuesCatalog catalog =
new TestValuesCatalog("testCatalog", "test_database", true);
@@ -381,7 +381,7 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
@Test
public void testComplexDimSideWithJoinInDimSide() {
- // Dim side contains join will not succeed in this version, it will improve later.
+ // TODO, Dpp will not success with complex dim side.
util.tableEnv()
.executeSql(
"CREATE TABLE sales (\n"
@@ -434,13 +434,175 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
util.verifyRelPlan(query);
}
- // --------------------------dpp factor test ---------------------------------------------
+ @Test
+ public void testDppWithoutJoinReorder() {
+ // Dpp will success
+ String ddl =
+ "CREATE TABLE test_database.item (\n"
+ + " id BIGINT,\n"
+ + " amount BIGINT,\n"
+ + " price BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")";
+ util.tableEnv().executeSql(ddl);
+ TableConfig tableConfig = util.tableEnv().getConfig();
+ // Join reorder don't open.
+ tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false);
+
+ String query =
+ "Select * from fact_part, item, dim"
+ + " where fact_part.fact_date_sk = dim.dim_date_sk"
+ + " and fact_part.id = item.id"
+ + " and dim.id = item.id "
+ + " and dim.price < 500 and dim.price > 300";
+ util.verifyRelPlan(query);
+ }
+
+ @Test
+ public void testDppWithSubQuery() {
+ // Dpp will success
+ String ddl =
+ "CREATE TABLE test_database.item (\n"
+ + " id BIGINT,\n"
+ + " amount BIGINT,\n"
+ + " price BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")";
+ util.tableEnv().executeSql(ddl);
+ TableConfig tableConfig = util.tableEnv().getConfig();
+ // Join reorder don't open.
+ tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false);
+
+ String query =
+ "Select * from fact_part, item, dim"
+ + " where fact_part.id = item.id"
+ + " and dim.price in (select price from dim where amount = (select amount from dim where amount = 2000))"
+ + " and fact_part.fact_date_sk = dim.dim_date_sk";
+ util.verifyRelPlan(query);
+ }
+
+ @Test
+ public void testDppWithUnionInFactSide() {
+ // Dpp will success.
+ String ddl =
+ "CREATE TABLE test_database.item (\n"
+ + " id BIGINT,\n"
+ + " amount BIGINT,\n"
+ + " price BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")";
+ util.tableEnv().executeSql(ddl);
+
+ String query =
+ "Select * from (select id, fact_date_sk, amount + 1 as amount1 from fact_part where price = 1 union all "
+ + "select id, fact_date_sk, amount + 1 from fact_part where price = 2) fact_part2, item, dim"
+ + " where fact_part2.fact_date_sk = dim.dim_date_sk"
+ + " and fact_part2.id = item.id"
+ + " and dim.price < 500 and dim.price > 300";
+ util.verifyRelPlan(query);
+ }
+
+ @Test
+ public void testDppWithAggInFactSideAndJoinKeyInGrouping() {
+ // Dpp will success
+ String ddl =
+ "CREATE TABLE test_database.item (\n"
+ + " id BIGINT,\n"
+ + " amount BIGINT,\n"
+ + " price BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")";
+ util.tableEnv().executeSql(ddl);
+
+ String query =
+ "Select * from (Select fact_date_sk, item.amount, sum(fact_part.price) from fact_part "
+ + "join item on fact_part.id = item.id group by fact_date_sk, item.amount) t1 "
+ + "join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ";
+ util.verifyRelPlan(query);
+ }
+
+ @Test
+ public void testDppWithAggInFactSideAndJoinKeyInGroupFunction() {
+ // Dpp will not success because join key in group function.
+ String ddl =
+ "CREATE TABLE test_database.item (\n"
+ + " id BIGINT,\n"
+ + " amount BIGINT,\n"
+ + " price BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")";
+ util.tableEnv().executeSql(ddl);
+
+ String query =
+ "Select * from (Select fact_part.id, item.amount, fact_part.name, sum(fact_part.price), sum(item.price), sum(fact_date_sk) as fact_date_sk1 "
+ + "from fact_part join item on fact_part.id = item.id "
+ + "group by fact_part.id, fact_part.name, item.amount) t1 "
+ + "join dim on t1.fact_date_sk1 = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ";
+ util.verifyRelPlan(query);
+ }
+
+ @Test
+ public void testDppWithAggInFactSideWithAggPushDownEnable() {
+ // Dpp will not success while fact side source support agg push down and source agg push
+ // down enabled is true.
+ String ddl =
+ "CREATE TABLE test_database.item (\n"
+ + " id BIGINT,\n"
+ + " amount BIGINT,\n"
+ + " price BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")";
+ util.tableEnv().executeSql(ddl);
+
+ String query =
+ "Select * from (Select id, amount, fact_date_sk, count(name), sum(price) "
+ + "from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 "
+ + "join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ";
+ util.verifyRelPlan(query);
+ }
+
+ @Test
+ public void testDppWithAggInFactSideWithAggPushDownDisable() {
+ // Dpp will success while fact side source support agg push down but source agg push down
+ // enabled is false.
+ TableConfig tableConfig = util.tableEnv().getConfig();
+ // Disable source agg push down.
+ tableConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, false);
+
+ String ddl =
+ "CREATE TABLE test_database.item (\n"
+ + " id BIGINT,\n"
+ + " amount BIGINT,\n"
+ + " price BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")";
+ util.tableEnv().executeSql(ddl);
+
+ String query =
+ "Select * from (Select id, amount, fact_date_sk, count(name), sum(price) "
+ + "from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 "
+ + "join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ";
+ util.verifyRelPlan(query);
+ }
@Test
- public void testDPPFactorToReorderTableWithoutStats() {
- // While there are several joins, and fact table not adjacent to dim table directly. dynamic
- // partition pruning factor will try best to reorder join relations to make fact table
- // adjacent to dim table.
+ public void testDPPWithJoinReorderTableWithoutStats() {
+ // Dpp will success.
String ddl =
"CREATE TABLE test_database.item (\n"
+ " id BIGINT,\n"
@@ -465,7 +627,7 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
}
@Test
- public void testDPPFactorToReorderTableWithStats() throws TableNotExistException {
+ public void testDPPWithJoinReorderTableWithStats() throws TableNotExistException {
String ddl =
"CREATE TABLE test_database.item (\n"
+ " id BIGINT,\n"
@@ -518,8 +680,8 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
}
@Test
- public void testDPPFactorWithFactSideJoinKeyChanged() {
- // If partition keys changed in fact side. DPP factor will not work.
+ public void testDPPWithFactSideJoinKeyChanged() {
+ // If partition keys changed in fact side. DPP factor will not success.
String ddl =
"CREATE TABLE test_database.item (\n"
+ " id BIGINT,\n"
@@ -543,8 +705,8 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
}
@Test
- public void testDPPFactorWithDimSideJoinKeyChanged() {
- // Although partition keys changed in dim side. DPP factor will work.
+ public void testDPPWithDimSideJoinKeyChanged() {
+ // Although partition keys changed in dim side. DPP will success.
String ddl =
"CREATE TABLE test_database.item (\n"
+ " id BIGINT,\n"
@@ -568,9 +730,9 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
}
@Test
- public void testDPPFactorWithJoinKeysNotIncludePartitionKeys() {
- // If join keys of partition table join with dim table not include partition keys, dpp
- // factor will not be adjusted and dpp will not succeed.
+ public void testDPPWithJoinKeysNotIncludePartitionKeys() {
+ // If join keys of partition table join with dim table not include partition keys, dpp will
+ // not success.
String ddl =
"CREATE TABLE test_database.item (\n"
+ " id BIGINT,\n"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml
similarity index 72%
rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml
rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml
index b9d9dfc726b..5ff1bafdb97 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml
@@ -91,7 +91,13 @@ LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], d
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, name, amount, price, fact_date_sk, dim_date_sk, EXPR$1], build=[right])
:- Exchange(distribution=[hash[fact_date_sk]])
-: +- TableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+: +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+: +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+: +- HashAggregate(isMerge=[true], groupBy=[dim_date_sk], select=[dim_date_sk, Final_SUM(sum$0) AS EXPR$1])
+: +- Exchange(distribution=[hash[dim_date_sk]])
+: +- LocalHashAggregate(groupBy=[dim_date_sk], select=[dim_date_sk, Partial_SUM(price) AS sum$0])
+: +- Calc(select=[price, dim_date_sk], where=[<(price, 500)])
+: +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])
+- HashAggregate(isMerge=[true], groupBy=[dim_date_sk], select=[dim_date_sk, Final_SUM(sum$0) AS EXPR$1])
+- Exchange(distribution=[hash[dim_date_sk]])
+- LocalHashAggregate(groupBy=[dim_date_sk], select=[dim_date_sk, Partial_SUM(price) AS sum$0])
@@ -237,72 +243,144 @@ HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id,
]]>
</Resource>
</TestCase>
- <TestCase name="testDPPFactorToReorderTableWithoutStats">
+ <TestCase name="testDppWithAggInFactSideAndJoinKeyInGroupFunction">
<Resource name="sql">
- <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id and dim.price < 500 and dim.price > 300]]>
+ <![CDATA[Select * from (Select fact_part.id, item.amount, fact_part.name, sum(fact_part.price), sum(item.price), sum(fact_date_sk) as fact_date_sk1 from fact_part join item on fact_part.id = item.id group by fact_part.id, fact_part.name, item.amount) t1 join dim on t1.fact_date_sk1 = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
-+- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
- +- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalJoin(condition=[true], joinType=[inner])
- : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
- : +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+LogicalProject(id=[$0], amount=[$1], name=[$2], EXPR$3=[$3], EXPR$4=[$4], fact_date_sk1=[$5], id0=[$6], male=[$7], amount0=[$8], price=[$9], dim_date_sk=[$10])
++- LogicalFilter(condition=[AND(<($9, 500), >($9, 300))])
+ +- LogicalJoin(condition=[=($5, $10)], joinType=[inner])
+ :- LogicalProject(id=[$0], amount=[$2], name=[$1], EXPR$3=[$3], EXPR$4=[$4], fact_date_sk1=[$5])
+ : +- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)], EXPR$4=[SUM($4)], fact_date_sk1=[SUM($5)])
+ : +- LogicalProject(id=[$0], name=[$1], amount=[$6], price=[$3], price0=[$7], fact_date_sk=[$4])
+ : +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+- LogicalTableScan(table=[[testCatalog, test_database, dim]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
-+- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
- :- Exchange(distribution=[hash[id]])
- : +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
- +- HashJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
- :- Exchange(distribution=[hash[id]])
- : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
- : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
- : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
- : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
- +- Exchange(distribution=[hash[id]])
- +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
- +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk1, dim_date_sk)], select=[id, amount, name, EXPR$3, EXPR$4, fact_date_sk1, id0, male, amount0, price, dim_date_sk], build=[left])
+:- Exchange(distribution=[hash[fact_date_sk1]])
+: +- Calc(select=[id, amount, name, EXPR$3, EXPR$4, fact_date_sk1])
+: +- HashAggregate(isMerge=[false], groupBy=[id, name, amount], select=[id, name, amount, SUM(price) AS EXPR$3, SUM(price0) AS EXPR$4, SUM(fact_date_sk) AS fact_date_sk1])
+: +- Calc(select=[id, name, amount, price, price0, fact_date_sk])
+: +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, price, fact_date_sk, id0, amount, price0], build=[right])
+: :- Exchange(distribution=[hash[id]])
+: : +- TableSourceScan(table=[[testCatalog, test_database, fact_part, project=[id, name, price, fact_date_sk], metadata=[]]], fields=[id, name, price, fact_date_sk])
+: +- Exchange(distribution=[hash[id]])
+: +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
++- Exchange(distribution=[hash[dim_date_sk]])
+ +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
]]>
</Resource>
</TestCase>
- <TestCase name="testDPPFactorToReorderTableWithStats">
+ <TestCase name="testDppWithAggInFactSideAndJoinKeyInGrouping">
<Resource name="sql">
- <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id and dim.price < 500 and dim.price > 300]]>
+ <![CDATA[Select * from (Select fact_date_sk, item.amount, sum(fact_part.price) from fact_part join item on fact_part.id = item.id group by fact_date_sk, item.amount) t1 join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
-+- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
- +- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalJoin(condition=[true], joinType=[inner])
- : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
- : +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+LogicalProject(fact_date_sk=[$0], amount=[$1], EXPR$2=[$2], id=[$3], male=[$4], amount0=[$5], price=[$6], dim_date_sk=[$7])
++- LogicalFilter(condition=[AND(<($6, 500), >($6, 300))])
+ +- LogicalJoin(condition=[=($0, $7)], joinType=[inner])
+ :- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
+ : +- LogicalProject(fact_date_sk=[$4], amount=[$6], price=[$3])
+ : +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+- LogicalTableScan(table=[[testCatalog, test_database, dim]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
-+- NestedLoopJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
- :- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
- +- Exchange(distribution=[broadcast])
- +- NestedLoopJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
- :- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
- : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
- : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
- : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
- +- Exchange(distribution=[broadcast])
- +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
- +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[fact_date_sk, amount, EXPR$2, id, male, amount0, price, dim_date_sk], build=[left])
+:- Exchange(distribution=[hash[fact_date_sk]])
+: +- HashAggregate(isMerge=[true], groupBy=[fact_date_sk, amount], select=[fact_date_sk, amount, Final_SUM(sum$0) AS EXPR$2])
+: +- Exchange(distribution=[hash[fact_date_sk, amount]])
+: +- LocalHashAggregate(groupBy=[fact_date_sk, amount], select=[fact_date_sk, amount, Partial_SUM(price) AS sum$0])
+: +- Calc(select=[fact_date_sk, amount, price])
+: +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, price, fact_date_sk, id0, amount], build=[right])
+: :- Exchange(distribution=[hash[id]])
+: : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[id, price, fact_date_sk], metadata=[]]], fields=[id, price, fact_date_sk])
+: : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+: : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+: : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+: +- Exchange(distribution=[hash[id]])
+: +- TableSourceScan(table=[[testCatalog, test_database, item, project=[id, amount], metadata=[]]], fields=[id, amount])
++- Exchange(distribution=[hash[dim_date_sk]])
+ +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDppWithAggInFactSideWithAggPushDownDisable">
+ <Resource name="sql">
+ <![CDATA[Select * from (Select id, amount, fact_date_sk, count(name), sum(price) from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], amount=[$1], fact_date_sk=[$2], EXPR$3=[$3], EXPR$4=[$4], id0=[$5], male=[$6], amount0=[$7], price=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(<($8, 500), >($8, 300))])
+ +- LogicalJoin(condition=[=($2, $9)], joinType=[inner])
+ :- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)], EXPR$4=[SUM($4)])
+ : +- LogicalProject(id=[$0], amount=[$2], fact_date_sk=[$4], name=[$1], price=[$3])
+ : +- LogicalFilter(condition=[>($4, 100)])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, amount, fact_date_sk, EXPR$3, EXPR$4, id0, male, amount0, price, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+: +- HashAggregate(isMerge=[true], groupBy=[id, amount, fact_date_sk], select=[id, amount, fact_date_sk, Final_COUNT(count$0) AS EXPR$3, Final_SUM(sum$1) AS EXPR$4])
+: +- Exchange(distribution=[hash[id, amount, fact_date_sk]])
+: +- LocalHashAggregate(groupBy=[id, amount, fact_date_sk], select=[id, amount, fact_date_sk, Partial_COUNT(name) AS count$0, Partial_SUM(price) AS sum$1])
+: +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, partitions=[{fact_date_sk=1990}, {fact_date_sk=1991}, {fact_date_sk=1992}]]], fields=[id, name, amount, price, fact_date_sk])
+: +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+: +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+: +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[dim_date_sk]])
+ +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDppWithAggInFactSideWithAggPushDownEnable">
+ <Resource name="sql">
+ <![CDATA[Select * from (Select id, amount, fact_date_sk, count(name), sum(price) from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], amount=[$1], fact_date_sk=[$2], EXPR$3=[$3], EXPR$4=[$4], id0=[$5], male=[$6], amount0=[$7], price=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(<($8, 500), >($8, 300))])
+ +- LogicalJoin(condition=[=($2, $9)], joinType=[inner])
+ :- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)], EXPR$4=[SUM($4)])
+ : +- LogicalProject(id=[$0], amount=[$2], fact_date_sk=[$4], name=[$1], price=[$3])
+ : +- LogicalFilter(condition=[>($4, 100)])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, amount, fact_date_sk, EXPR$3, EXPR$4, id0, male, amount0, price, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+: +- HashAggregate(isMerge=[true], groupBy=[id, amount, fact_date_sk], select=[id, amount, fact_date_sk, Final_COUNT(count$0) AS EXPR$3, Final_SUM(sum$1) AS EXPR$4])
+: +- Exchange(distribution=[hash[id, amount, fact_date_sk]])
+: +- TableSourceScan(table=[[testCatalog, test_database, fact_part, partitions=[{fact_date_sk=1990}, {fact_date_sk=1991}, {fact_date_sk=1992}], aggregates=[grouping=[id,amount,fact_date_sk], aggFunctions=[CountAggFunction(name),LongSumAggFunction(price)]]]], fields=[id, amount, fact_date_sk, count$0, sum$1])
++- Exchange(distribution=[hash[dim_date_sk]])
+ +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
]]>
</Resource>
</TestCase>
- <TestCase name="testDPPFactorWithDimSideJoinKeyChanged">
+ <TestCase name="testDPPWithDimSideJoinKeyChanged">
<Resource name="sql">
<![CDATA[Select * from fact_part join item on fact_part.id = item.id join (select dim_date_sk + 1 as dim_date_sk, price from dim) dim1 on fact_part.fact_date_sk = dim1.dim_date_sk where dim1.price < 500 and dim1.price > 300]]>
</Resource>
@@ -337,7 +415,7 @@ Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk,
]]>
</Resource>
</TestCase>
- <TestCase name="testDPPFactorWithFactSideJoinKeyChanged">
+ <TestCase name="testDPPWithFactSideJoinKeyChanged">
<Resource name="sql">
<![CDATA[Select * from (select fact_date_sk + 1 as fact_date_sk, id from fact_part) fact_part1 join item on fact_part1.id = item.id join dim on fact_part1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300]]>
</Resource>
@@ -370,7 +448,7 @@ Calc(select=[fact_date_sk, id0 AS id, id AS id0, amount, price, id00 AS id1, mal
]]>
</Resource>
</TestCase>
- <TestCase name="testDPPFactorWithJoinKeysNotIncludePartitionKeys">
+ <TestCase name="testDPPWithJoinKeysNotIncludePartitionKeys">
<Resource name="sql">
<![CDATA[Select * from fact_part, item, dim where fact_part.id = dim.id and fact_part.id = item.id and dim.id = item.id and dim.price < 500 and dim.price > 300]]>
</Resource>
@@ -396,6 +474,213 @@ HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, pri
+- Exchange(distribution=[hash[id]])
+- Calc(select=[id, male, amount, price, dim_date_sk], where=[AND(SEARCH(price, Sarg[(300..500)]), IS NOT NULL(id))])
+- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDPPWithJoinReorderTableWithoutStats">
+ <Resource name="sql">
+ <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id and dim.price < 500 and dim.price > 300]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+ +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
++- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
+ :- Exchange(distribution=[hash[id]])
+ : +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
+ +- HashJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
+ :- Exchange(distribution=[hash[id]])
+ : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+ : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+ : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+ +- Exchange(distribution=[hash[id]])
+ +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDPPWithJoinReorderTableWithStats">
+ <Resource name="sql">
+ <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id and dim.price < 500 and dim.price > 300]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+ +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
++- NestedLoopJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
+ :- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
+ +- Exchange(distribution=[broadcast])
+ +- NestedLoopJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
+ :- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+ : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+ : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+ +- Exchange(distribution=[broadcast])
+ +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDppWithoutJoinReorder">
+ <Resource name="sql">
+ <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id and dim.price < 500 and dim.price > 300]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+ +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id1, id0))], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0, id1, male, amount1, price1, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk, id0]])
+: +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0], build=[right])
+: :- Exchange(distribution=[hash[id]])
+: : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+: : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+: : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+: : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+: +- Exchange(distribution=[hash[id]])
+: +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
++- Exchange(distribution=[hash[dim_date_sk, id]])
+ +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDppWithSubQuery">
+ <Resource name="sql">
+ <![CDATA[Select * from fact_part, item, dim where fact_part.id = item.id and dim.price in (select price from dim where amount = (select amount from dim where amount = 2000)) and fact_part.fact_date_sk = dim.dim_date_sk]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($0, $5), IN($11, {
+LogicalProject(price=[$3])
+ LogicalFilter(condition=[=($2, $SCALAR_QUERY({
+LogicalProject(amount=[$2])
+ LogicalFilter(condition=[=($2, 2000)])
+ LogicalTableScan(table=[[testCatalog, test_database, dim]])
+}))])
+ LogicalTableScan(table=[[testCatalog, test_database, dim]])
+}), =($4, $12))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+ +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0, id1, male, amount1, price1, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+: +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0], build=[right])
+: :- Exchange(distribution=[hash[id]])
+: : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+: : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+: : +- HashJoin(joinType=[LeftSemiJoin], where=[=(price, price0)], select=[id, male, amount, price, dim_date_sk], build=[right])
+: : :- Exchange(distribution=[hash[price]])
+: : : +- TableSourceScan(table=[[testCatalog, test_database, dim]], fields=[id, male, amount, price, dim_date_sk])
+: : +- Exchange(distribution=[hash[price]])
+: : +- Calc(select=[price])
+: : +- NestedLoopJoin(joinType=[InnerJoin], where=[=(amount, $f0)], select=[amount, price, $f0], build=[right], singleRowJoin=[true])
+: : :- TableSourceScan(table=[[testCatalog, test_database, dim, project=[amount, price], metadata=[]]], fields=[amount, price])
+: : +- Exchange(distribution=[broadcast])
+: : +- HashAggregate(isMerge=[true], select=[Final_SINGLE_VALUE(value$0, count$1) AS $f0])
+: : +- Exchange(distribution=[single])
+: : +- LocalHashAggregate(select=[Partial_SINGLE_VALUE(amount) AS (value$0, count$1)])
+: : +- Calc(select=[CAST(2000 AS BIGINT) AS amount], where=[=(amount, 2000)])
+: : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], project=[amount], metadata=[]]], fields=[amount])
+: +- Exchange(distribution=[hash[id]])
+: +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
++- Exchange(distribution=[hash[dim_date_sk]])
+ +- HashJoin(joinType=[LeftSemiJoin], where=[=(price, price0)], select=[id, male, amount, price, dim_date_sk], build=[right])
+ :- Exchange(distribution=[hash[price]])
+ : +- TableSourceScan(table=[[testCatalog, test_database, dim]], fields=[id, male, amount, price, dim_date_sk])
+ +- Exchange(distribution=[hash[price]])
+ +- Calc(select=[price])
+ +- NestedLoopJoin(joinType=[InnerJoin], where=[=(amount, $f0)], select=[amount, price, $f0], build=[right], singleRowJoin=[true])
+ :- TableSourceScan(table=[[testCatalog, test_database, dim, project=[amount, price], metadata=[]]], fields=[amount, price])
+ +- Exchange(distribution=[broadcast])
+ +- HashAggregate(isMerge=[true], select=[Final_SINGLE_VALUE(value$0, count$1) AS $f0])
+ +- Exchange(distribution=[single])
+ +- LocalHashAggregate(select=[Partial_SINGLE_VALUE(amount) AS (value$0, count$1)])
+ +- Calc(select=[CAST(2000 AS BIGINT) AS amount], where=[=(amount, 2000)])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], project=[amount], metadata=[]]], fields=[amount])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDppWithUnionInFactSide">
+ <Resource name="sql">
+ <![CDATA[Select * from (select id, fact_date_sk, amount + 1 as amount1 from fact_part where price = 1 union all select id, fact_date_sk, amount + 1 from fact_part where price = 2) fact_part2, item, dim where fact_part2.fact_date_sk = dim.dim_date_sk and fact_part2.id = item.id and dim.price < 500 and dim.price > 300]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], fact_date_sk=[$1], amount1=[$2], id0=[$3], amount=[$4], price=[$5], id1=[$6], male=[$7], amount0=[$8], price0=[$9], dim_date_sk=[$10])
++- LogicalFilter(condition=[AND(=($1, $10), =($0, $3), <($9, 500), >($9, 300))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalUnion(all=[true])
+ : : :- LogicalProject(id=[$0], fact_date_sk=[$4], amount1=[+($2, 1)])
+ : : : +- LogicalFilter(condition=[=($3, 1)])
+ : : : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ : : +- LogicalProject(id=[$0], fact_date_sk=[$4], EXPR$2=[+($2, 1)])
+ : : +- LogicalFilter(condition=[=($3, 2)])
+ : : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+ : +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+ +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, fact_date_sk, amount1, id0, amount, price, id1, male, amount0, price0, dim_date_sk], build=[left])
+:- Exchange(distribution=[hash[fact_date_sk]])
+: +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, fact_date_sk, amount1, id0, amount, price], build=[left])
+: :- Exchange(distribution=[hash[id]])
+: : +- Union(all=[true], union=[id, fact_date_sk, amount1])
+: : :- Calc(select=[id, fact_date_sk, +(amount, 1) AS amount1], where=[=(price, 1)])
+: : : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[id, amount, price, fact_date_sk], metadata=[]]], fields=[id, amount, price, fact_date_sk])
+: : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+: : : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+: : : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+: : +- Calc(select=[id, fact_date_sk, +(amount, 1) AS EXPR$2], where=[=(price, 2)])
+: : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[id, amount, price, fact_date_sk], metadata=[]]], fields=[id, amount, price, fact_date_sk])
+: : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+: : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+: : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+: +- Exchange(distribution=[hash[id]])
+: +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
++- Exchange(distribution=[hash[dim_date_sk]])
+ +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+ +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
]]>
</Resource>
</TestCase>