You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2023/01/09 04:31:40 UTC

[flink] branch master updated: [FLINK-30365][table-planner] New dynamic partition pruning strategy to support more dpp patterns

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d9f9b55f82d [FLINK-30365][table-planner] New dynamic partition pruning strategy to support more dpp patterns
d9f9b55f82d is described below

commit d9f9b55f82dfbc1676572cc36b718a99001497f8
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Mon Dec 12 12:06:38 2022 +0800

    [FLINK-30365][table-planner] New dynamic partition pruning strategy to support more dpp patterns
    
    This closes #21489
---
 .../FlinkDynamicPartitionPruningProgram.java       | 148 +++++
 .../batch/DynamicPartitionPruningRule.java         | 739 ---------------------
 .../utils/DynamicPartitionPruningUtils.java        | 699 +++++++++++--------
 .../planner/plan/metadata/FlinkRelMdRowCount.scala |  18 +-
 .../plan/optimize/program/FlinkBatchProgram.scala  |   4 +
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |  21 +-
 .../DynamicPartitionPruningProgramTest.java}       | 196 +++++-
 .../DynamicPartitionPruningProgramTest.xml}        | 375 +++++++++--
 8 files changed, 1102 insertions(+), 1098 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java
new file mode 100644
index 00000000000..22aebd7fcab
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner program that tries to do partition prune in the execution phase, which can translate a
+ * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan}
+ * whose source is a partition source. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * LogicalProject(...)
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
+ *  * +- Exchange(distribution=[broadcast])
+ *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>This physical plan will be rewritten to:
+ *
+ * <pre>{@code
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
+ * :  +- DynamicFilteringDataCollector(fields=[dim_key])
+ * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
+ * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * +- Exchange(distribution=[broadcast])
+ *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>Note: We use a {@link FlinkOptimizeProgram} instead of a {@link
+ * org.apache.calcite.plan.RelRule} here because the {@link org.apache.calcite.plan.hep.HepPlanner}
+ * doesn't support matching a partially determined pattern or dynamically replacing the inputs of
+ * matched nodes. Once we improve the {@link org.apache.calcite.plan.hep.HepPlanner}, then class can
+ * be converted to {@link org.apache.calcite.plan.RelRule}.
+ */
+public class FlinkDynamicPartitionPruningProgram
+        implements FlinkOptimizeProgram<BatchOptimizeContext> {
+
+    @Override
+    public RelNode optimize(RelNode root, BatchOptimizeContext context) {
+        if (!ShortcutUtils.unwrapContext(root)
+                .getTableConfig()
+                .get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+            return root;
+        }
+        DefaultRelShuttle shuttle =
+                new DefaultRelShuttle() {
+                    @Override
+                    public RelNode visit(RelNode rel) {
+                        if (!(rel instanceof Join)
+                                || !DynamicPartitionPruningUtils.isSuitableJoin((Join) rel)) {
+                            List<RelNode> newInputs = new ArrayList<>();
+                            for (RelNode input : rel.getInputs()) {
+                                RelNode newInput = input.accept(this);
+                                newInputs.add(newInput);
+                            }
+                            return rel.copy(rel.getTraitSet(), newInputs);
+                        }
+                        Join join = (Join) rel;
+                        JoinInfo joinInfo = join.analyzeCondition();
+                        RelNode leftSide = join.getLeft();
+                        RelNode rightSide = join.getRight();
+                        Join newJoin = join;
+                        boolean changed = false;
+                        if (DynamicPartitionPruningUtils.isDppDimSide(leftSide)) {
+                            if (join.getJoinType() != JoinRelType.RIGHT) {
+                                Tuple2<Boolean, RelNode> relTuple =
+                                        DynamicPartitionPruningUtils
+                                                .canConvertAndConvertDppFactSide(
+                                                        rightSide,
+                                                        joinInfo.rightKeys,
+                                                        leftSide,
+                                                        joinInfo.leftKeys);
+                                changed = relTuple.f0;
+                                newJoin =
+                                        join.copy(
+                                                join.getTraitSet(),
+                                                Arrays.asList(leftSide, relTuple.f1.accept(this)));
+                            }
+                        } else if (DynamicPartitionPruningUtils.isDppDimSide(rightSide)) {
+                            if (join.getJoinType() != JoinRelType.LEFT) {
+                                Tuple2<Boolean, RelNode> relTuple =
+                                        DynamicPartitionPruningUtils
+                                                .canConvertAndConvertDppFactSide(
+                                                        leftSide,
+                                                        joinInfo.leftKeys,
+                                                        rightSide,
+                                                        joinInfo.rightKeys);
+                                changed = relTuple.f0;
+                                newJoin =
+                                        join.copy(
+                                                join.getTraitSet(),
+                                                Arrays.asList(relTuple.f1.accept(this), rightSide));
+                            }
+                        }
+
+                        if (changed) {
+                            return newJoin;
+                        } else {
+                            return newJoin.copy(
+                                    newJoin.getTraitSet(),
+                                    Arrays.asList(
+                                            newJoin.getLeft().accept(this),
+                                            newJoin.getRight().accept(this)));
+                        }
+                    }
+                };
+        return shuttle.visit(root);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
deleted file mode 100644
index b0767d27f05..00000000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
+++ /dev/null
@@ -1,739 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.rules.physical.batch;
-
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringDataCollector;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalJoinBase;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel;
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
-import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
-
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Exchange;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.calcite.tools.RuleSet;
-import org.apache.calcite.tools.RuleSets;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Planner rule that tries to do partition prune in the execution phase, which can translate a
- * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan}
- * whose source is a partition source. The {@link
- * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true.
- *
- * <p>Suppose we have the original physical plan:
- *
- * <pre>{@code
- * LogicalProject(...)
- * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
- *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
- *  * +- Exchange(distribution=[broadcast])
- *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
- *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
- * }</pre>
- *
- * <p>This physical plan will be rewritten to:
- *
- * <pre>{@code
- * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
- * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
- * :  +- DynamicFilteringDataCollector(fields=[dim_key])
- * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
- * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
- * +- Exchange(distribution=[broadcast])
- *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
- *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
- * }</pre>
- */
-public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config> {
-
-    // To support more patterns of dynamic partition pruning, in this rule base, we provide eight
-    // different matching rules according to the different situation of the fact side (partition
-    // table side) and the different order of left/right join.
-    public static final RuleSet DYNAMIC_PARTITION_PRUNING_RULES =
-            RuleSets.ofList(
-                    DynamicPartitionPruningFactInRightRule.Config.DEFAULT.toRule(),
-                    DynamicPartitionPruningFactInLeftRule.Config.DEFAULT.toRule(),
-                    DynamicPartitionPruningFactInRightWithExchangeRule.Config.DEFAULT.toRule(),
-                    DynamicPartitionPruningFactInLeftWithExchangeRule.Config.DEFAULT.toRule(),
-                    DynamicPartitionPruningFactInRightWithCalcRule.Config.DEFAULT.toRule(),
-                    DynamicPartitionPruningFactInLeftWithCalcRule.Config.DEFAULT.toRule(),
-                    DynamicPartitionPruningFactInRightWithExchangeAndCalcRule.Config.DEFAULT
-                            .toRule(),
-                    DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule.Config.DEFAULT
-                            .toRule());
-
-    protected DynamicPartitionPruningRule(RelRule.Config config) {
-        super(config);
-    }
-
-    private static List<Integer> getAcceptedFieldIndices(
-            List<Integer> factJoinKeys,
-            @Nullable BatchPhysicalCalc factCalc,
-            BatchPhysicalTableSourceScan factScan,
-            DynamicTableSource tableSource) {
-        List<String> candidateFields;
-        if (factCalc == null) {
-            candidateFields =
-                    factJoinKeys.stream()
-                            .map(i -> factScan.getRowType().getFieldNames().get(i))
-                            .collect(Collectors.toList());
-        } else {
-            // Changing the fact key index in fact table calc output to fact key index in fact
-            // table, and filtering these fields that computing in calc node.
-            RexProgram program = factCalc.getProgram();
-            List<Integer> joinKeysIndexInFactTable = new ArrayList<>();
-            for (int joinKeyIdx : factJoinKeys) {
-                RexNode node = program.expandLocalRef(program.getProjectList().get(joinKeyIdx));
-                if (node instanceof RexInputRef) {
-                    joinKeysIndexInFactTable.add(((RexInputRef) node).getIndex());
-                }
-            }
-
-            if (joinKeysIndexInFactTable.isEmpty()) {
-                return Collections.emptyList();
-            }
-
-            candidateFields =
-                    joinKeysIndexInFactTable.stream()
-                            .map(i -> factScan.getRowType().getFieldNames().get(i))
-                            .collect(Collectors.toList());
-        }
-
-        List<String> acceptedFilterFields =
-                DynamicPartitionPruningUtils.getSuitableDynamicFilteringFieldsInFactSide(
-                        tableSource, candidateFields);
-        // Apply suitable accepted filter fields to source.
-        ((SupportsDynamicFiltering) tableSource).applyDynamicFiltering(acceptedFilterFields);
-
-        if (factCalc == null) {
-            return acceptedFilterFields.stream()
-                    .map(f -> factScan.getRowType().getFieldNames().indexOf(f))
-                    .collect(Collectors.toList());
-        } else {
-            return getAcceptedFieldsIndicesInCalc(
-                    acceptedFilterFields, factJoinKeys, factCalc, factScan);
-        }
-    }
-
-    private static List<Integer> getAcceptedFieldsIndicesInCalc(
-            List<String> acceptedFields,
-            List<Integer> factJoinKeys,
-            BatchPhysicalCalc factCalc,
-            BatchPhysicalTableSourceScan factScan) {
-        List<Integer> acceptedFieldsIndicesInFactScan =
-                acceptedFields.stream()
-                        .map(f -> factScan.getRowType().getFieldNames().indexOf(f))
-                        .collect(Collectors.toList());
-        RexProgram program = factCalc.getProgram();
-        List<Integer> acceptedFieldsIndicesInCalc = new ArrayList<>();
-        for (int joinKeyIdx : factJoinKeys) {
-            RexNode node = program.expandLocalRef(program.getProjectList().get(joinKeyIdx));
-            if (node instanceof RexInputRef
-                    && acceptedFieldsIndicesInFactScan.contains(((RexInputRef) node).getIndex())) {
-                acceptedFieldsIndicesInCalc.add(joinKeyIdx);
-            }
-        }
-
-        return acceptedFieldsIndicesInCalc;
-    }
-
-    protected BatchPhysicalDynamicFilteringTableSourceScan createDynamicFilteringTableSourceScan(
-            BatchPhysicalTableSourceScan factScan,
-            BatchPhysicalRel dimSide,
-            BatchPhysicalJoinBase join,
-            @Nullable BatchPhysicalCalc factCalc,
-            boolean factInLeft) {
-        JoinInfo joinInfo = join.analyzeCondition();
-        TableSourceTable tableSourceTable = factScan.getTable().unwrap(TableSourceTable.class);
-        DynamicTableSource tableSource = tableSourceTable.tableSource();
-
-        List<Integer> factJoinKeys = factInLeft ? joinInfo.leftKeys : joinInfo.rightKeys;
-        List<Integer> dimJoinKeys = factInLeft ? joinInfo.rightKeys : joinInfo.leftKeys;
-
-        List<Integer> acceptedFieldIndices =
-                getAcceptedFieldIndices(factJoinKeys, factCalc, factScan, tableSource);
-
-        List<Integer> dynamicFilteringFieldIndices = new ArrayList<>();
-        for (int i = 0; i < joinInfo.leftKeys.size(); ++i) {
-            if (acceptedFieldIndices.contains(factJoinKeys.get(i))) {
-                dynamicFilteringFieldIndices.add(dimJoinKeys.get(i));
-            }
-        }
-        final BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector =
-                createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices);
-        return new BatchPhysicalDynamicFilteringTableSourceScan(
-                factScan.getCluster(),
-                factScan.getTraitSet(),
-                factScan.getHints(),
-                factScan.tableSourceTable(),
-                dynamicFilteringDataCollector);
-    }
-
-    private BatchPhysicalDynamicFilteringDataCollector createDynamicFilteringConnector(
-            RelNode dimSide, List<Integer> dynamicFilteringFieldIndices) {
-        final RelDataType outputType =
-                ((FlinkTypeFactory) dimSide.getCluster().getTypeFactory())
-                        .projectStructType(
-                                dimSide.getRowType(),
-                                dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray());
-        return new BatchPhysicalDynamicFilteringDataCollector(
-                dimSide.getCluster(),
-                dimSide.getTraitSet(),
-                ignoreExchange(dimSide),
-                outputType,
-                dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray());
-    }
-
-    private RelNode ignoreExchange(RelNode dimSide) {
-        if (dimSide instanceof Exchange) {
-            return dimSide.getInput(0);
-        } else {
-            return dimSide;
-        }
-    }
-
-    /** Simple dynamic filtering pattern with fact side in join right. */
-    protected static class DynamicPartitionPruningFactInRightRule
-            extends DynamicPartitionPruningRule {
-
-        public DynamicPartitionPruningFactInRightRule(RelRule.Config config) {
-            super(config);
-        }
-
-        /** Config. */
-        public interface Config extends RelRule.Config {
-            Config DEFAULT =
-                    EMPTY.withOperandSupplier(
-                                    b0 ->
-                                            b0.operand(BatchPhysicalJoinBase.class)
-                                                    .inputs(
-                                                            l ->
-                                                                    l.operand(
-                                                                                    BatchPhysicalRel
-                                                                                            .class)
-                                                                            .anyInputs(),
-                                                            r ->
-                                                                    r.operand(
-                                                                                    BatchPhysicalTableSourceScan
-                                                                                            .class)
-                                                                            .noInputs()))
-                            .as(DynamicPartitionPruningFactInRightRule.Config.class);
-
-            @Override
-            default DynamicPartitionPruningFactInRightRule toRule() {
-                return new DynamicPartitionPruningFactInRightRule(this);
-            }
-        }
-
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false);
-        }
-
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            final BatchPhysicalRel dimSide = call.rel(1);
-            final BatchPhysicalTableSourceScan factScan = call.rel(2);
-
-            final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
-                    createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, false);
-            final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newFactScan));
-            call.transformTo(newJoin);
-        }
-    }
-
-    /** Simple dynamic filtering pattern with fact side in join left. */
-    protected static class DynamicPartitionPruningFactInLeftRule
-            extends DynamicPartitionPruningRule {
-
-        public DynamicPartitionPruningFactInLeftRule(RelRule.Config config) {
-            super(config);
-        }
-
-        /** Config. */
-        public interface Config extends RelRule.Config {
-            @Override
-            default DynamicPartitionPruningFactInLeftRule toRule() {
-                return new DynamicPartitionPruningFactInLeftRule(this);
-            }
-
-            Config DEFAULT =
-                    EMPTY.withOperandSupplier(
-                                    b0 ->
-                                            b0.operand(BatchPhysicalJoinBase.class)
-                                                    .inputs(
-                                                            l ->
-                                                                    l.operand(
-                                                                                    BatchPhysicalTableSourceScan
-                                                                                            .class)
-                                                                            .noInputs(),
-                                                            r ->
-                                                                    r.operand(
-                                                                                    BatchPhysicalRel
-                                                                                            .class)
-                                                                            .anyInputs()))
-                            .as(DynamicPartitionPruningFactInLeftRule.Config.class);
-        }
-
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true);
-        }
-
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            final BatchPhysicalTableSourceScan factScan = call.rel(1);
-            final BatchPhysicalRel dimSide = call.rel(2);
-
-            final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
-                    createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, true);
-            final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newFactScan, dimSide));
-            call.transformTo(newJoin);
-        }
-    }
-
-    /** Dynamic filtering pattern with exchange node in fact side while fact side in join right. */
-    protected static class DynamicPartitionPruningFactInRightWithExchangeRule
-            extends DynamicPartitionPruningRule {
-
-        public DynamicPartitionPruningFactInRightWithExchangeRule(RelRule.Config config) {
-            super(config);
-        }
-
-        /** Config. */
-        public interface Config extends RelRule.Config {
-            @Override
-            default DynamicPartitionPruningFactInRightWithExchangeRule toRule() {
-                return new DynamicPartitionPruningFactInRightWithExchangeRule(this);
-            }
-
-            Config DEFAULT =
-                    EMPTY.withOperandSupplier(
-                                    b0 ->
-                                            b0.operand(BatchPhysicalJoinBase.class)
-                                                    .inputs(
-                                                            l ->
-                                                                    l.operand(
-                                                                                    BatchPhysicalRel
-                                                                                            .class)
-                                                                            .anyInputs(),
-                                                            r ->
-                                                                    r.operand(
-                                                                                    BatchPhysicalExchange
-                                                                                            .class)
-                                                                            .oneInput(
-                                                                                    e ->
-                                                                                            e.operand(
-                                                                                                            BatchPhysicalTableSourceScan
-                                                                                                                    .class)
-                                                                                                    .noInputs())))
-                            .as(DynamicPartitionPruningFactInRightWithExchangeRule.Config.class);
-        }
-
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false);
-        }
-
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            final BatchPhysicalRel dimSide = call.rel(1);
-            final BatchPhysicalExchange exchange = call.rel(2);
-            final BatchPhysicalTableSourceScan factScan = call.rel(3);
-
-            final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
-                    createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, false);
-            final BatchPhysicalExchange newExchange =
-                    (BatchPhysicalExchange)
-                            exchange.copy(
-                                    exchange.getTraitSet(), Collections.singletonList(newFactScan));
-            final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newExchange));
-            call.transformTo(newJoin);
-        }
-    }
-
-    /** Dynamic filtering pattern with exchange node in fact side while fact side in join left. */
-    protected static class DynamicPartitionPruningFactInLeftWithExchangeRule
-            extends DynamicPartitionPruningRule {
-
-        public DynamicPartitionPruningFactInLeftWithExchangeRule(RelRule.Config config) {
-            super(config);
-        }
-
-        /** Config. */
-        public interface Config extends RelRule.Config {
-            @Override
-            default DynamicPartitionPruningFactInLeftWithExchangeRule toRule() {
-                return new DynamicPartitionPruningFactInLeftWithExchangeRule(this);
-            }
-
-            Config DEFAULT =
-                    EMPTY.withOperandSupplier(
-                                    b0 ->
-                                            b0.operand(BatchPhysicalJoinBase.class)
-                                                    .inputs(
-                                                            l ->
-                                                                    l.operand(
-                                                                                    BatchPhysicalExchange
-                                                                                            .class)
-                                                                            .oneInput(
-                                                                                    e ->
-                                                                                            e.operand(
-                                                                                                            BatchPhysicalTableSourceScan
-                                                                                                                    .class)
-                                                                                                    .noInputs()),
-                                                            r ->
-                                                                    r.operand(
-                                                                                    BatchPhysicalRel
-                                                                                            .class)
-                                                                            .anyInputs()))
-                            .as(DynamicPartitionPruningFactInLeftWithExchangeRule.Config.class);
-        }
-
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true);
-        }
-
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            final BatchPhysicalExchange exchange = call.rel(1);
-            final BatchPhysicalTableSourceScan factScan = call.rel(2);
-            final BatchPhysicalRel dimSide = call.rel(3);
-
-            final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
-                    createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, true);
-            final BatchPhysicalExchange newExchange =
-                    (BatchPhysicalExchange)
-                            exchange.copy(
-                                    exchange.getTraitSet(), Collections.singletonList(newFactScan));
-            final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newExchange, dimSide));
-            call.transformTo(newJoin);
-        }
-    }
-
-    /** Dynamic filtering pattern with calc node in fact side while fact side in join right. */
-    protected static class DynamicPartitionPruningFactInRightWithCalcRule
-            extends DynamicPartitionPruningRule {
-
-        public DynamicPartitionPruningFactInRightWithCalcRule(RelRule.Config config) {
-            super(config);
-        }
-
-        /** Config. */
-        public interface Config extends RelRule.Config {
-            @Override
-            default DynamicPartitionPruningFactInRightWithCalcRule toRule() {
-                return new DynamicPartitionPruningFactInRightWithCalcRule(this);
-            }
-
-            Config DEFAULT =
-                    EMPTY.withOperandSupplier(
-                                    b0 ->
-                                            b0.operand(BatchPhysicalJoinBase.class)
-                                                    .inputs(
-                                                            l ->
-                                                                    l.operand(
-                                                                                    BatchPhysicalRel
-                                                                                            .class)
-                                                                            .anyInputs(),
-                                                            r ->
-                                                                    r.operand(
-                                                                                    BatchPhysicalCalc
-                                                                                            .class)
-                                                                            .oneInput(
-                                                                                    f ->
-                                                                                            f.operand(
-                                                                                                            BatchPhysicalTableSourceScan
-                                                                                                                    .class)
-                                                                                                    .noInputs())))
-                            .as(DynamicPartitionPruningFactInRightWithCalcRule.Config.class);
-        }
-
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false);
-        }
-
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            final BatchPhysicalRel dimSide = call.rel(1);
-            final BatchPhysicalCalc factCalc = call.rel(2);
-            final BatchPhysicalTableSourceScan factScan = call.rel(3);
-
-            final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
-                    createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, false);
-            final BatchPhysicalCalc newCalc =
-                    (BatchPhysicalCalc)
-                            factCalc.copy(
-                                    factCalc.getTraitSet(), newFactScan, factCalc.getProgram());
-            final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newCalc));
-            call.transformTo(newJoin);
-        }
-    }
-
-    /** Dynamic filtering pattern with calc node in fact side while fact side in join left. */
-    protected static class DynamicPartitionPruningFactInLeftWithCalcRule
-            extends DynamicPartitionPruningRule {
-
-        public DynamicPartitionPruningFactInLeftWithCalcRule(RelRule.Config config) {
-            super(config);
-        }
-
-        /** Config. */
-        public interface Config extends RelRule.Config {
-            @Override
-            default DynamicPartitionPruningFactInLeftWithCalcRule toRule() {
-                return new DynamicPartitionPruningFactInLeftWithCalcRule(this);
-            }
-
-            Config DEFAULT =
-                    EMPTY.withOperandSupplier(
-                                    b0 ->
-                                            b0.operand(BatchPhysicalJoinBase.class)
-                                                    .inputs(
-                                                            l ->
-                                                                    l.operand(
-                                                                                    BatchPhysicalCalc
-                                                                                            .class)
-                                                                            .oneInput(
-                                                                                    f ->
-                                                                                            f.operand(
-                                                                                                            BatchPhysicalTableSourceScan
-                                                                                                                    .class)
-                                                                                                    .noInputs()),
-                                                            r ->
-                                                                    r.operand(
-                                                                                    BatchPhysicalRel
-                                                                                            .class)
-                                                                            .anyInputs()))
-                            .as(DynamicPartitionPruningFactInLeftWithCalcRule.Config.class);
-        }
-
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true);
-        }
-
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            final BatchPhysicalCalc factCalc = call.rel(1);
-            final BatchPhysicalTableSourceScan factScan = call.rel(2);
-            final BatchPhysicalRel dimSide = call.rel(3);
-
-            final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
-                    createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, true);
-            final BatchPhysicalCalc newCalc =
-                    (BatchPhysicalCalc)
-                            factCalc.copy(
-                                    factCalc.getTraitSet(), newFactScan, factCalc.getProgram());
-            final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newCalc, dimSide));
-            call.transformTo(newJoin);
-        }
-    }
-
-    /**
-     * Dynamic filtering pattern with exchange node and calc node in fact side while fact side in
-     * join right.
-     */
-    protected static class DynamicPartitionPruningFactInRightWithExchangeAndCalcRule
-            extends DynamicPartitionPruningRule {
-
-        public DynamicPartitionPruningFactInRightWithExchangeAndCalcRule(RelRule.Config config) {
-            super(config);
-        }
-
-        /** Config. */
-        public interface Config extends RelRule.Config {
-            @Override
-            default DynamicPartitionPruningFactInRightWithExchangeAndCalcRule toRule() {
-                return new DynamicPartitionPruningFactInRightWithExchangeAndCalcRule(this);
-            }
-
-            Config DEFAULT =
-                    EMPTY.withOperandSupplier(
-                                    b0 ->
-                                            b0.operand(BatchPhysicalJoinBase.class)
-                                                    .inputs(
-                                                            l ->
-                                                                    l.operand(
-                                                                                    BatchPhysicalRel
-                                                                                            .class)
-                                                                            .anyInputs(),
-                                                            r ->
-                                                                    r.operand(
-                                                                                    BatchPhysicalExchange
-                                                                                            .class)
-                                                                            .oneInput(
-                                                                                    e ->
-                                                                                            e.operand(
-                                                                                                            BatchPhysicalCalc
-                                                                                                                    .class)
-                                                                                                    .oneInput(
-                                                                                                            f ->
-                                                                                                                    f.operand(
-                                                                                                                                    BatchPhysicalTableSourceScan
-                                                                                                                                            .class)
-                                                                                                                            .noInputs()))))
-                            .as(
-                                    DynamicPartitionPruningFactInRightWithExchangeAndCalcRule.Config
-                                            .class);
-        }
-
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false);
-        }
-
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            final BatchPhysicalRel dimSide = call.rel(1);
-            final BatchPhysicalExchange exchange = call.rel(2);
-            final BatchPhysicalCalc factCalc = call.rel(3);
-            final BatchPhysicalTableSourceScan factScan = call.rel(4);
-
-            final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
-                    createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, false);
-            final BatchPhysicalCalc newCalc =
-                    (BatchPhysicalCalc)
-                            factCalc.copy(
-                                    factCalc.getTraitSet(), newFactScan, factCalc.getProgram());
-            final BatchPhysicalExchange newExchange =
-                    (BatchPhysicalExchange)
-                            exchange.copy(
-                                    exchange.getTraitSet(), Collections.singletonList(newCalc));
-            final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newExchange));
-            call.transformTo(newJoin);
-        }
-    }
-
-    /**
-     * Dynamic filtering pattern with exchange node and calc node in fact side while fact side in
-     * join left.
-     */
-    protected static class DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule
-            extends DynamicPartitionPruningRule {
-
-        public DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule(RelRule.Config config) {
-            super(config);
-        }
-
-        /** Config. */
-        public interface Config extends RelRule.Config {
-            @Override
-            default DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule toRule() {
-                return new DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule(this);
-            }
-
-            Config DEFAULT =
-                    EMPTY.withOperandSupplier(
-                                    b0 ->
-                                            b0.operand(BatchPhysicalJoinBase.class)
-                                                    .inputs(
-                                                            l ->
-                                                                    l.operand(
-                                                                                    BatchPhysicalExchange
-                                                                                            .class)
-                                                                            .oneInput(
-                                                                                    e ->
-                                                                                            e.operand(
-                                                                                                            BatchPhysicalCalc
-                                                                                                                    .class)
-                                                                                                    .oneInput(
-                                                                                                            f ->
-                                                                                                                    f.operand(
-                                                                                                                                    BatchPhysicalTableSourceScan
-                                                                                                                                            .class)
-                                                                                                                            .noInputs())),
-                                                            r ->
-                                                                    r.operand(
-                                                                                    BatchPhysicalRel
-                                                                                            .class)
-                                                                            .anyInputs()))
-                            .as(
-                                    DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule.Config
-                                            .class);
-        }
-
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true);
-        }
-
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-            final BatchPhysicalJoinBase join = call.rel(0);
-            final BatchPhysicalExchange exchange = call.rel(1);
-            final BatchPhysicalCalc factCalc = call.rel(2);
-            final BatchPhysicalTableSourceScan factScan = call.rel(3);
-            final BatchPhysicalRel dimSide = call.rel(4);
-
-            final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
-                    createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, true);
-            final BatchPhysicalCalc newCalc =
-                    (BatchPhysicalCalc)
-                            factCalc.copy(
-                                    factCalc.getTraitSet(), newFactScan, factCalc.getProgram());
-            final BatchPhysicalExchange newExchange =
-                    (BatchPhysicalExchange)
-                            exchange.copy(
-                                    exchange.getTraitSet(), Collections.singletonList(newCalc));
-            final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newExchange, dimSide));
-            call.transformTo(newJoin);
-        }
-    }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
index 9f2649cdf8b..90f7b40bc0b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
@@ -19,18 +19,23 @@
 package org.apache.flink.table.planner.utils;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceProvider;
 import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.connectors.TransformationScanProvider;
+import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec;
 import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
 import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringDataCollector;
 import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 
@@ -45,6 +50,8 @@ import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
@@ -52,6 +59,8 @@ import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.util.ImmutableIntList;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -60,318 +69,470 @@ import java.util.stream.Collectors;
 public class DynamicPartitionPruningUtils {
 
     /**
-     * For the input join node, judge whether the join left side and join right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right join is not clear.
+     * Judge whether the input RelNode meets the conditions of dimSide. If joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if
+     * changed, this RelNode is not dim side.
      */
-    public static boolean supportDynamicPartitionPruning(Join join) {
-        return supportDynamicPartitionPruning(join, true)
-                || supportDynamicPartitionPruning(join, false);
+    public static boolean isDppDimSide(RelNode rel) {
+        DppDimSideChecker dimSideChecker = new DppDimSideChecker(rel);
+        return dimSideChecker.isDppDimSide();
     }
 
     /**
-     * For the input join node, judge whether the join left side and join right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right is clear. If meets the
-     * requirements, return true.
+     * Judge whether the input RelNode can be converted to the dpp fact side. If the input RelNode
+     * can be converted, this method will return the converted fact side whose partitioned table
+     * source will be converted to {@link BatchPhysicalDynamicFilteringTableSourceScan}, If not,
+     * this method will return the origin RelNode.
      */
-    public static boolean supportDynamicPartitionPruning(Join join, boolean factInLeft) {
-        if (!ShortcutUtils.unwrapContext(join)
-                .getTableConfig()
-                .get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
-            return false;
-        }
-        // Now dynamic partition pruning supports left/right join, inner and semi join. but now semi
-        // join can not join reorder.
-        if (join.getJoinType() == JoinRelType.LEFT) {
-            if (factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() == JoinRelType.RIGHT) {
-            if (!factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() != JoinRelType.INNER
-                && join.getJoinType() != JoinRelType.SEMI) {
+    public static Tuple2<Boolean, RelNode> canConvertAndConvertDppFactSide(
+            RelNode rel,
+            ImmutableIntList joinKeys,
+            RelNode dimSide,
+            ImmutableIntList dimSideJoinKey) {
+        DppFactSideChecker dppFactSideChecker =
+                new DppFactSideChecker(rel, joinKeys, dimSide, dimSideJoinKey);
+        return dppFactSideChecker.canConvertAndConvertDppFactSide();
+    }
+
+    /** Judge whether the join node is suitable one for dpp pattern. */
+    public static boolean isSuitableJoin(Join join) {
+        // Now dynamic partition pruning supports left/right join, inner and semi
+        // join. but now semi join can not join reorder.
+        if (join.getJoinType() != JoinRelType.INNER
+                && join.getJoinType() != JoinRelType.SEMI
+                && join.getJoinType() != JoinRelType.LEFT
+                && join.getJoinType() != JoinRelType.RIGHT) {
             return false;
         }
 
         JoinInfo joinInfo = join.analyzeCondition();
-        if (joinInfo.leftKeys.isEmpty()) {
-            return false;
-        }
-        RelNode left = join.getLeft();
-        RelNode right = join.getRight();
-
-        // TODO Now fact side and dim side don't support many complex patterns, like join inside
-        // fact/dim side, agg inside fact/dim side etc. which will support next.
-        return factInLeft
-                ? isDynamicPartitionPruningPattern(left, right, joinInfo.leftKeys)
-                : isDynamicPartitionPruningPattern(right, left, joinInfo.rightKeys);
+        return !joinInfo.leftKeys.isEmpty();
     }
 
-    private static boolean isDynamicPartitionPruningPattern(
-            RelNode factSide, RelNode dimSide, ImmutableIntList factSideJoinKey) {
-        return isDimSide(dimSide) && isFactSide(factSide, factSideJoinKey);
-    }
+    /** This class is used to check whether the relNode is dpp dim side. */
+    private static class DppDimSideChecker {
+        private final RelNode relNode;
+        private boolean hasFilter;
+        private boolean hasPartitionedScan;
+        private final List<ContextResolvedTable> tables = new ArrayList<>();
 
-    /** make a dpp fact side factor to recurrence in fact side. */
-    private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
-        DppFactSideFactors factSideFactors = new DppFactSideFactors();
-        visitFactSide(rel, factSideFactors, joinKeys);
-        return factSideFactors.isFactSide();
-    }
+        public DppDimSideChecker(RelNode relNode) {
+            this.relNode = relNode;
+        }
 
-    /**
-     * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we
-     * need not consider the join keys in dim side, which already deal by dynamic partition pruning
-     * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if
-     * changed, this RelNode is not dim side.
-     */
-    private static boolean isDimSide(RelNode rel) {
-        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
-        visitDimSide(rel, dimSideFactors);
-        return dimSideFactors.isDimSide();
-    }
+        public boolean isDppDimSide() {
+            visitDimSide(this.relNode);
+            return hasFilter && !hasPartitionedScan && tables.size() == 1;
+        }
 
-    /**
-     * Visit fact side to judge whether fact side has partition table, partition table source meets
-     * the condition of dpp table source and dynamic filtering keys changed in fact side.
-     */
-    private static void visitFactSide(
-            RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList joinKeys) {
-        if (rel instanceof TableScan) {
-            TableScan scan = (TableScan) rel;
-            if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {
-                // rule applied
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
-            TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
-            if (tableSourceTable == null) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
-            CatalogTable catalogTable = tableSourceTable.contextResolvedTable().getResolvedTable();
-            List<String> partitionKeys = catalogTable.getPartitionKeys();
-            if (partitionKeys.isEmpty()) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
-            DynamicTableSource tableSource = tableSourceTable.tableSource();
-            if (!(tableSource instanceof SupportsDynamicFiltering)
-                    || !(tableSource instanceof ScanTableSource)) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
-            if (!isNewSource((ScanTableSource) tableSource)) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
+        /**
+         * Visit dim side to judge whether dim side has filter condition and whether dim side's
+         * source table scan is non partitioned scan.
+         */
+        private void visitDimSide(RelNode rel) {
+            // TODO Let visitDimSide more efficient and more accurate. Like a filter on dim table or
+            // a filter for the partition field on fact table.
+            if (rel instanceof TableScan) {
+                TableScan scan = (TableScan) rel;
+                TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
+                if (table == null) {
+                    return;
+                }
+                if (!hasFilter
+                        && table.abilitySpecs() != null
+                        && table.abilitySpecs().length != 0) {
+                    for (SourceAbilitySpec spec : table.abilitySpecs()) {
+                        if (spec instanceof FilterPushDownSpec) {
+                            List<RexNode> predicates = ((FilterPushDownSpec) spec).getPredicates();
+                            for (RexNode predicate : predicates) {
+                                if (isSuitableFilter(predicate)) {
+                                    hasFilter = true;
+                                }
+                            }
+                        }
+                    }
+                }
+                CatalogTable catalogTable = table.contextResolvedTable().getResolvedTable();
+                if (catalogTable.isPartitioned()) {
+                    hasPartitionedScan = true;
+                    return;
+                }
 
-            List<String> candidateFields =
-                    joinKeys.stream()
-                            .map(i -> scan.getRowType().getFieldNames().get(i))
-                            .collect(Collectors.toList());
-            if (candidateFields.isEmpty()) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                // To ensure there is only one source on the dim side.
+                setTables(table.contextResolvedTable());
+            } else if (rel instanceof HepRelVertex) {
+                visitDimSide(((HepRelVertex) rel).getCurrentRel());
+            } else if (rel instanceof Exchange || rel instanceof Project) {
+                visitDimSide(rel.getInput(0));
+            } else if (rel instanceof Calc) {
+                RexProgram origProgram = ((Calc) rel).getProgram();
+                if (origProgram.getCondition() != null
+                        && isSuitableFilter(
+                                origProgram.expandLocalRef(origProgram.getCondition()))) {
+                    hasFilter = true;
+                }
+                visitDimSide(rel.getInput(0));
+            } else if (rel instanceof Filter) {
+                if (isSuitableFilter(((Filter) rel).getCondition())) {
+                    hasFilter = true;
+                }
+                visitDimSide(rel.getInput(0));
+            } else if (rel instanceof Join) {
+                Join join = (Join) rel;
+                visitDimSide(join.getLeft());
+                visitDimSide(join.getRight());
+            } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+                visitDimSide(((BatchPhysicalGroupAggregateBase) rel).getInput());
+            } else if (rel instanceof Union) {
+                Union union = (Union) rel;
+                for (RelNode input : union.getInputs()) {
+                    visitDimSide(input);
+                }
             }
+        }
 
-            factSideFactors.isSuitableFactScanSource =
-                    !getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields)
-                            .isEmpty();
-        } else if (rel instanceof HepRelVertex) {
-            visitFactSide(((HepRelVertex) rel).getCurrentRel(), factSideFactors, joinKeys);
-        } else if (rel instanceof Exchange || rel instanceof Filter) {
-            visitFactSide(rel.getInput(0), factSideFactors, joinKeys);
-        } else if (rel instanceof Project) {
-            List<RexNode> projects = ((Project) rel).getProjects();
-            ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
-            if (inputJoinKeys.isEmpty()) {
-                factSideFactors.isSuitableJoinKey = false;
-                return;
+        /**
+         * Not all filter condition suitable for using to filter partitions by dynamic partition
+         * pruning rules. For example, NOT NULL can only filter one default partition which have a
+         * small impact on filtering data.
+         */
+        private static boolean isSuitableFilter(RexNode filterCondition) {
+            switch (filterCondition.getKind()) {
+                case AND:
+                    List<RexNode> conjunctions = RelOptUtil.conjunctions(filterCondition);
+                    return isSuitableFilter(conjunctions.get(0))
+                            || isSuitableFilter(conjunctions.get(1));
+                case OR:
+                    List<RexNode> disjunctions = RelOptUtil.disjunctions(filterCondition);
+                    return isSuitableFilter(disjunctions.get(0))
+                            && isSuitableFilter(disjunctions.get(1));
+                case NOT:
+                    return isSuitableFilter(((RexCall) filterCondition).operands.get(0));
+                case EQUALS:
+                case GREATER_THAN:
+                case GREATER_THAN_OR_EQUAL:
+                case LESS_THAN:
+                case LESS_THAN_OR_EQUAL:
+                case NOT_EQUALS:
+                case IN:
+                case LIKE:
+                case CONTAINS:
+                case SEARCH:
+                case IS_FALSE:
+                case IS_NOT_FALSE:
+                case IS_NOT_TRUE:
+                case IS_TRUE:
+                    // TODO adding more suitable filters which can filter enough partitions after
+                    // using this filter in dynamic partition pruning.
+                    return true;
+                default:
+                    return false;
             }
+        }
 
-            visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
-        } else if (rel instanceof Calc) {
-            Calc calc = (Calc) rel;
-            RexProgram program = calc.getProgram();
-            List<RexNode> projects =
-                    program.getProjectList().stream()
-                            .map(program::expandLocalRef)
-                            .collect(Collectors.toList());
-            ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
-            if (inputJoinKeys.isEmpty()) {
-                factSideFactors.isSuitableJoinKey = false;
-                return;
+        private void setTables(ContextResolvedTable catalogTable) {
+            if (tables.size() == 0) {
+                tables.add(catalogTable);
+            } else {
+                for (ContextResolvedTable thisTable : new ArrayList<>(tables)) {
+                    if (!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) {
+                        tables.add(catalogTable);
+                    }
+                }
             }
-
-            visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
         }
     }
 
-    public static List<String> getSuitableDynamicFilteringFieldsInFactSide(
-            DynamicTableSource tableSource, List<String> candidateFields) {
-        List<String> acceptedFilterFields =
-                ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields();
-        if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
-            return new ArrayList<>();
+    /** This class is used to check whether the relNode can be as a fact side in dpp. */
+    private static class DppFactSideChecker {
+        private final RelNode relNode;
+        private final ImmutableIntList joinKeys;
+        private final RelNode dimSide;
+        private final ImmutableIntList dimSideJoinKey;
+
+        // If join key is not changed in fact side, this value is always true.
+        private boolean isChanged;
+
+        public DppFactSideChecker(
+                RelNode relNode,
+                ImmutableIntList joinKeys,
+                RelNode dimSide,
+                ImmutableIntList dimSideJoinKey) {
+            this.relNode = relNode;
+            this.joinKeys = joinKeys;
+            this.dimSide = dimSide;
+            this.dimSideJoinKey = dimSideJoinKey;
         }
 
-        List<String> suitableFields = new ArrayList<>();
-        // If candidateField not in acceptedFilterFields means dpp rule will not be matched,
-        // because we can not prune any partitions according to non-accepted filter fields
-        // provided by partition table source.
-        for (String candidateField : candidateFields) {
-            if (acceptedFilterFields.contains(candidateField)) {
-                suitableFields.add(candidateField);
-            }
+        public Tuple2<Boolean, RelNode> canConvertAndConvertDppFactSide() {
+            return Tuple2.of(
+                    isChanged, convertDppFactSide(relNode, joinKeys, dimSide, dimSideJoinKey));
         }
 
-        return suitableFields;
-    }
+        private RelNode convertDppFactSide(
+                RelNode rel,
+                ImmutableIntList joinKeys,
+                RelNode dimSide,
+                ImmutableIntList dimSideJoinKey) {
+            if (rel instanceof TableScan) {
+                TableScan scan = (TableScan) rel;
+                if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {
+                    // rule applied
+                    return rel;
+                }
+                TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
+                if (tableSourceTable == null) {
+                    return rel;
+                }
+                CatalogTable catalogTable =
+                        tableSourceTable.contextResolvedTable().getResolvedTable();
+                List<String> partitionKeys = catalogTable.getPartitionKeys();
+                if (partitionKeys.isEmpty()) {
+                    return rel;
+                }
+                DynamicTableSource tableSource = tableSourceTable.tableSource();
+                if (!(tableSource instanceof SupportsDynamicFiltering)
+                        || !(tableSource instanceof ScanTableSource)) {
+                    return rel;
+                }
 
-    /**
-     * Visit dim side to judge whether dim side has filter condition and whether dim side's source
-     * table scan is non partitioned scan.
-     */
-    private static void visitDimSide(RelNode rel, DppDimSideFactors dimSideFactors) {
-        // TODO Let visitDimSide more efficient and more accurate. Like a filter on dim table or a
-        // filter for the partition field on fact table.
-        if (rel instanceof TableScan) {
-            TableScan scan = (TableScan) rel;
-            TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
-            if (table == null) {
-                return;
-            }
-            if (!dimSideFactors.hasFilter
-                    && table.abilitySpecs() != null
-                    && table.abilitySpecs().length != 0) {
-                for (SourceAbilitySpec spec : table.abilitySpecs()) {
-                    if (spec instanceof FilterPushDownSpec) {
-                        List<RexNode> predicates = ((FilterPushDownSpec) spec).getPredicates();
-                        for (RexNode predicate : predicates) {
-                            if (isSuitableFilter(predicate)) {
-                                dimSideFactors.hasFilter = true;
-                            }
-                        }
+                // Dpp cannot success if source have aggregate push down spec.
+                if (Arrays.stream(tableSourceTable.abilitySpecs())
+                        .anyMatch(spec -> spec instanceof AggregatePushDownSpec)) {
+                    return rel;
+                }
+
+                if (!isNewSource((ScanTableSource) tableSource)) {
+                    return rel;
+                }
+
+                List<String> candidateFields =
+                        joinKeys.stream()
+                                .map(i -> scan.getRowType().getFieldNames().get(i))
+                                .collect(Collectors.toList());
+                if (candidateFields.isEmpty()) {
+                    return rel;
+                }
+
+                List<String> acceptedFilterFields =
+                        getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields);
+
+                if (acceptedFilterFields.size() == 0) {
+                    return rel;
+                }
+
+                // Apply suitable accepted filter fields to source.
+                ((SupportsDynamicFiltering) tableSource)
+                        .applyDynamicFiltering(acceptedFilterFields);
+
+                List<Integer> acceptedFieldIndices =
+                        acceptedFilterFields.stream()
+                                .map(f -> scan.getRowType().getFieldNames().indexOf(f))
+                                .collect(Collectors.toList());
+                List<Integer> dynamicFilteringFieldIndices = new ArrayList<>();
+                for (int i = 0; i < joinKeys.size(); ++i) {
+                    if (acceptedFieldIndices.contains(joinKeys.get(i))) {
+                        dynamicFilteringFieldIndices.add(dimSideJoinKey.get(i));
                     }
                 }
+
+                BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector =
+                        createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices);
+
+                isChanged = true;
+                return new BatchPhysicalDynamicFilteringTableSourceScan(
+                        scan.getCluster(),
+                        scan.getTraitSet(),
+                        scan.getHints(),
+                        tableSourceTable,
+                        dynamicFilteringDataCollector);
+            } else if (rel instanceof Exchange || rel instanceof Filter) {
+                return rel.copy(
+                        rel.getTraitSet(),
+                        Collections.singletonList(
+                                convertDppFactSide(
+                                        rel.getInput(0), joinKeys, dimSide, dimSideJoinKey)));
+            } else if (rel instanceof Project) {
+                List<RexNode> projects = ((Project) rel).getProjects();
+                ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
+                if (inputJoinKeys.isEmpty()) {
+                    return rel;
+                }
+
+                return rel.copy(
+                        rel.getTraitSet(),
+                        Collections.singletonList(
+                                convertDppFactSide(
+                                        rel.getInput(0), inputJoinKeys, dimSide, dimSideJoinKey)));
+            } else if (rel instanceof Calc) {
+                Calc calc = (Calc) rel;
+                RexProgram program = calc.getProgram();
+                List<RexNode> projects =
+                        program.getProjectList().stream()
+                                .map(program::expandLocalRef)
+                                .collect(Collectors.toList());
+                ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
+                if (inputJoinKeys.isEmpty()) {
+                    return rel;
+                }
+
+                return rel.copy(
+                        rel.getTraitSet(),
+                        Collections.singletonList(
+                                convertDppFactSide(
+                                        rel.getInput(0), inputJoinKeys, dimSide, dimSideJoinKey)));
+            } else if (rel instanceof Join) {
+                Join currentJoin = (Join) rel;
+                return currentJoin.copy(
+                        currentJoin.getTraitSet(),
+                        Arrays.asList(
+                                convertDppFactSide(
+                                        currentJoin.getLeft(),
+                                        getInputIndices(currentJoin, joinKeys, true),
+                                        dimSide,
+                                        dimSideJoinKey),
+                                convertDppFactSide(
+                                        currentJoin.getRight(),
+                                        getInputIndices(currentJoin, joinKeys, false),
+                                        dimSide,
+                                        dimSideJoinKey)));
+            } else if (rel instanceof Union) {
+                Union union = (Union) rel;
+                List<RelNode> newInputs = new ArrayList<>();
+                for (RelNode input : union.getInputs()) {
+                    newInputs.add(convertDppFactSide(input, joinKeys, dimSide, dimSideJoinKey));
+                }
+                return union.copy(union.getTraitSet(), newInputs, union.all);
+            } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+                BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel;
+                RelNode input = agg.getInput();
+                int[] grouping = agg.grouping();
+
+                // If one of joinKey in joinKeys are aggregate function field, dpp will not success.
+                for (int k : joinKeys) {
+                    if (k >= grouping.length) {
+                        return rel;
+                    }
+                }
+
+                RelNode convertedRel =
+                        convertDppFactSide(
+                                input,
+                                ImmutableIntList.copyOf(
+                                        joinKeys.stream()
+                                                .map(joinKey -> agg.grouping()[joinKey])
+                                                .collect(Collectors.toList())),
+                                dimSide,
+                                dimSideJoinKey);
+                return agg.copy(agg.getTraitSet(), Collections.singletonList(convertedRel));
+            } else {
+                // TODO In the future, we need to support more operators to enrich matchable dpp
+                // pattern.
             }
-            CatalogTable catalogTable = table.contextResolvedTable().getResolvedTable();
-            dimSideFactors.hasNonPartitionedScan = !catalogTable.isPartitioned();
-        } else if (rel instanceof HepRelVertex) {
-            visitDimSide(((HepRelVertex) rel).getCurrentRel(), dimSideFactors);
-        } else if (rel instanceof Exchange || rel instanceof Project) {
-            visitDimSide(rel.getInput(0), dimSideFactors);
-        } else if (rel instanceof Calc) {
-            RexProgram origProgram = ((Calc) rel).getProgram();
-            if (origProgram.getCondition() != null
-                    && isSuitableFilter(origProgram.expandLocalRef(origProgram.getCondition()))) {
-                dimSideFactors.hasFilter = true;
+
+            return rel;
+        }
+
+        private static List<String> getSuitableDynamicFilteringFieldsInFactSide(
+                DynamicTableSource tableSource, List<String> candidateFields) {
+            List<String> acceptedFilterFields =
+                    ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields();
+            if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
+                return new ArrayList<>();
             }
-            visitDimSide(rel.getInput(0), dimSideFactors);
-        } else if (rel instanceof Filter) {
-            if (isSuitableFilter(((Filter) rel).getCondition())) {
-                dimSideFactors.hasFilter = true;
+
+            List<String> suitableFields = new ArrayList<>();
+            // If candidateField not in acceptedFilterFields means dpp rule will not be matched,
+            // because we can not prune any partitions according to non-accepted filter fields
+            // provided by partition table source.
+            for (String candidateField : candidateFields) {
+                if (acceptedFilterFields.contains(candidateField)) {
+                    suitableFields.add(candidateField);
+                }
             }
-            visitDimSide(rel.getInput(0), dimSideFactors);
-        }
-    }
 
-    /**
-     * Not all filter condition suitable for using to filter partitions by dynamic partition pruning
-     * rules. For example, NOT NULL can only filter one default partition which have a small impact
-     * on filtering data.
-     */
-    private static boolean isSuitableFilter(RexNode filterCondition) {
-        switch (filterCondition.getKind()) {
-            case AND:
-                List<RexNode> conjunctions = RelOptUtil.conjunctions(filterCondition);
-                return isSuitableFilter(conjunctions.get(0))
-                        || isSuitableFilter(conjunctions.get(1));
-            case OR:
-                List<RexNode> disjunctions = RelOptUtil.disjunctions(filterCondition);
-                return isSuitableFilter(disjunctions.get(0))
-                        && isSuitableFilter(disjunctions.get(1));
-            case NOT:
-                return isSuitableFilter(((RexCall) filterCondition).operands.get(0));
-            case EQUALS:
-            case GREATER_THAN:
-            case GREATER_THAN_OR_EQUAL:
-            case LESS_THAN:
-            case LESS_THAN_OR_EQUAL:
-            case NOT_EQUALS:
-            case IN:
-            case LIKE:
-            case CONTAINS:
-            case SEARCH:
-            case IS_FALSE:
-            case IS_NOT_FALSE:
-            case IS_NOT_TRUE:
-            case IS_TRUE:
-                // TODO adding more suitable filters which can filter enough partitions after using
-                // this filter in dynamic partition pruning.
-                return true;
-            default:
-                return false;
+            return suitableFields;
         }
-    }
 
-    /** Returns true if the source is FLIP-27 source, else false. */
-    private static boolean isNewSource(ScanTableSource scanTableSource) {
-        ScanTableSource.ScanRuntimeProvider provider =
-                scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
-        if (provider instanceof SourceProvider) {
-            return true;
-        } else if (provider instanceof TransformationScanProvider) {
-            Transformation<?> transformation =
-                    ((TransformationScanProvider) provider)
-                            .createTransformation(name -> Optional.empty());
-            return transformation instanceof SourceTransformation;
-        } else if (provider instanceof DataStreamScanProvider) {
-            // Suppose DataStreamScanProvider of sources that support dynamic filtering will use new
-            // Source. It's not reliable and should be checked.
-            // TODO FLINK-28864 check if the source used by the DataStreamScanProvider is actually a
-            //  new source.
-            // This situation will not generate wrong result because it's handled when translating
-            // BatchTableSourceScan. The only effect is the physical plan and the exec node plan
-            // have DPP nodes, but they do not work in runtime.
-            return true;
+        private static BatchPhysicalDynamicFilteringDataCollector createDynamicFilteringConnector(
+                RelNode dimSide, List<Integer> dynamicFilteringFieldIndices) {
+            final RelDataType outputType =
+                    ((FlinkTypeFactory) dimSide.getCluster().getTypeFactory())
+                            .projectStructType(
+                                    dimSide.getRowType(),
+                                    dynamicFilteringFieldIndices.stream()
+                                            .mapToInt(i -> i)
+                                            .toArray());
+            return new BatchPhysicalDynamicFilteringDataCollector(
+                    dimSide.getCluster(),
+                    dimSide.getTraitSet(),
+                    ignoreExchange(dimSide),
+                    outputType,
+                    dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray());
         }
-        // TODO supports more
-        return false;
-    }
 
-    private static ImmutableIntList getInputIndices(
-            List<RexNode> projects, ImmutableIntList joinKeys) {
-        List<Integer> indices = new ArrayList<>();
-        for (int k : joinKeys) {
-            RexNode rexNode = projects.get(k);
-            if (rexNode instanceof RexInputRef) {
-                indices.add(((RexInputRef) rexNode).getIndex());
+        private static RelNode ignoreExchange(RelNode dimSide) {
+            if (dimSide instanceof Exchange) {
+                return dimSide.getInput(0);
+            } else {
+                return dimSide;
             }
         }
-        return ImmutableIntList.copyOf(indices);
-    }
-
-    /** This class is used to remember dim side messages while recurring in dim side. */
-    private static class DppDimSideFactors {
-        private boolean hasFilter;
-        private boolean hasNonPartitionedScan;
 
-        public boolean isDimSide() {
-            return hasFilter && hasNonPartitionedScan;
+        /** Returns true if the source is FLIP-27 source, else false. */
+        private static boolean isNewSource(ScanTableSource scanTableSource) {
+            ScanTableSource.ScanRuntimeProvider provider =
+                    scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+            if (provider instanceof SourceProvider) {
+                return true;
+            } else if (provider instanceof TransformationScanProvider) {
+                Transformation<?> transformation =
+                        ((TransformationScanProvider) provider)
+                                .createTransformation(name -> Optional.empty());
+                return transformation instanceof SourceTransformation;
+            } else if (provider instanceof DataStreamScanProvider) {
+                // Suppose DataStreamScanProvider of sources that support dynamic filtering will use
+                // new Source. It's not reliable and should be checked.
+                // TODO FLINK-28864 check if the source used by the DataStreamScanProvider is
+                // actually a new source. This situation will not generate wrong result because it's
+                // handled when translating BatchTableSourceScan. The only effect is the physical
+                // plan and the exec node plan have DPP nodes, but they do not work in runtime.
+                return true;
+            }
+            // TODO supports more
+            return false;
         }
-    }
 
-    /** This class is used to remember fact side messages while recurring in fact side. */
-    private static class DppFactSideFactors {
-        private boolean isSuitableFactScanSource;
-        // If join key is not changed in fact side, this value is always true.
-        private boolean isSuitableJoinKey = true;
+        private static ImmutableIntList getInputIndices(
+                List<RexNode> projects, ImmutableIntList joinKeys) {
+            List<Integer> indices = new ArrayList<>();
+            for (int k : joinKeys) {
+                RexNode rexNode = projects.get(k);
+                if (rexNode instanceof RexInputRef) {
+                    indices.add(((RexInputRef) rexNode).getIndex());
+                }
+            }
+            return ImmutableIntList.copyOf(indices);
+        }
 
-        public boolean isFactSide() {
-            return isSuitableFactScanSource && isSuitableJoinKey;
+        private static ImmutableIntList getInputIndices(
+                Join join, ImmutableIntList joinKeys, boolean isLeft) {
+            List<Integer> indices = new ArrayList<>();
+            RelNode left = join.getLeft();
+            int leftSize = left.getRowType().getFieldCount();
+            for (int k : joinKeys) {
+                if (isLeft) {
+                    if (k < leftSize) {
+                        indices.add(k);
+                    }
+                } else {
+                    if (k >= leftSize) {
+                        indices.add(k - leftSize);
+                    }
+                }
+            }
+            return ImmutableIntList.copyOf(indices);
         }
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
index b20868b1835..f5dec96af68 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch._
 import org.apache.flink.table.planner.plan.stats.ValueInterval
 import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, SortUtil}
 import org.apache.flink.table.planner.plan.utils.AggregateUtil.{hasTimeIntervalType, toLong}
-import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 
 import org.apache.calcite.adapter.enumerable.EnumerableLimit
@@ -336,22 +335,11 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun
       fmq.getSelectivity(joinWithOnlyEquiPred, nonEquiPred)
     }
 
-    // Currently, join-reorder is before dynamic partition pruning rewrite. This factor
-    // is adding to adjust join cost for these join node which meets dynamic partition
-    // pruning pattern. Try best to reorder the fact table and fact table together to
-    // make DPP succeed.
-    val dynamicPartitionPruningFactor =
-      if (DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join)) {
-        0.0001
-      } else {
-        1
-      }
-
     if (leftNdv != null && rightNdv != null) {
       // selectivity of equi part is 1 / Max(leftNdv, rightNdv)
       val selectivityOfEquiPred = Math.min(1d, 1d / Math.max(leftNdv, rightNdv))
       return leftRowCount * rightRowCount * selectivityOfEquiPred *
-        selectivityOfNonEquiPred * dynamicPartitionPruningFactor
+        selectivityOfNonEquiPred
     }
 
     val leftKeysAreUnique = fmq.areColumnsUnique(leftChild, leftKeySet)
@@ -369,14 +357,14 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun
       } else {
         leftRowCount * selectivityOfNonEquiPred
       }
-      return outputRowCount * dynamicPartitionPruningFactor
+      return outputRowCount
     }
 
     // if joinCondition has no ndv stats and no uniqueKeys stats,
     // rowCount = (leftRowCount + rightRowCount) * join condition selectivity
     val crossJoin = copyJoinWithNewCondition(join, rexBuilder.makeLiteral(true))
     val selectivity = fmq.getSelectivity(crossJoin, condition)
-    (leftRowCount + rightRowCount) * selectivity * dynamicPartitionPruningFactor
+    (leftRowCount + rightRowCount) * selectivity
   }
 
   private def copyJoinWithNewCondition(join: Join, newCondition: RexNode): Join = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
index f64197c3836..a1784867149 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
@@ -40,6 +40,7 @@ object FlinkBatchProgram {
   val TIME_INDICATOR = "time_indicator"
   val PHYSICAL = "physical"
   val PHYSICAL_REWRITE = "physical_rewrite"
+  val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"
 
   def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[BatchOptimizeContext] = {
     val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]()
@@ -288,6 +289,9 @@ object FlinkBatchProgram {
         .build()
     )
 
+    // convert dynamic partition pruning scan source
+    chainedProgram.addLast(DYNAMIC_PARTITION_PRUNING, new FlinkDynamicPartitionPruningProgram)
+
     chainedProgram
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 39fa34cbaf6..6551d936c58 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -433,18 +433,13 @@ object FlinkBatchRuleSets {
 
   /** RuleSet to optimize plans after batch exec execution. */
   val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList(
-    (RuleSets
-      .ofList(
-        EnforceLocalHashAggRule.INSTANCE,
-        EnforceLocalSortAggRule.INSTANCE,
-        PushLocalHashAggIntoScanRule.INSTANCE,
-        PushLocalHashAggWithCalcIntoScanRule.INSTANCE,
-        PushLocalSortAggIntoScanRule.INSTANCE,
-        PushLocalSortAggWithSortIntoScanRule.INSTANCE,
-        PushLocalSortAggWithCalcIntoScanRule.INSTANCE,
-        PushLocalSortAggWithSortAndCalcIntoScanRule.INSTANCE
-      )
-      .asScala ++
-      DynamicPartitionPruningRule.DYNAMIC_PARTITION_PRUNING_RULES.asScala).asJava
+    EnforceLocalHashAggRule.INSTANCE,
+    EnforceLocalSortAggRule.INSTANCE,
+    PushLocalHashAggIntoScanRule.INSTANCE,
+    PushLocalHashAggWithCalcIntoScanRule.INSTANCE,
+    PushLocalSortAggIntoScanRule.INSTANCE,
+    PushLocalSortAggWithSortIntoScanRule.INSTANCE,
+    PushLocalSortAggWithCalcIntoScanRule.INSTANCE,
+    PushLocalSortAggWithSortAndCalcIntoScanRule.INSTANCE
   )
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java
similarity index 76%
rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java
index 0a7f0c7bc9d..cef12b156aa 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.rules.physical.batch;
+package org.apache.flink.table.planner.plan.optimize.program;
 
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
@@ -40,10 +40,10 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Test for rules that extend {@link DynamicPartitionPruningRule} to create {@link
+ * Tests for rules that extend {@link FlinkDynamicPartitionPruningProgram} to create {@link
  * org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan}.
  */
-public class DynamicPartitionPruningRuleTest extends TableTestBase {
+public class DynamicPartitionPruningProgramTest extends TableTestBase {
     private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault());
     private final TestValuesCatalog catalog =
             new TestValuesCatalog("testCatalog", "test_database", true);
@@ -381,7 +381,7 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
 
     @Test
     public void testComplexDimSideWithJoinInDimSide() {
-        // Dim side contains join will not succeed in this version, it will improve later.
+        // TODO, Dpp will not success with complex dim side.
         util.tableEnv()
                 .executeSql(
                         "CREATE TABLE sales (\n"
@@ -434,13 +434,175 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
         util.verifyRelPlan(query);
     }
 
-    // --------------------------dpp factor test ---------------------------------------------
+    @Test
+    public void testDppWithoutJoinReorder() {
+        // Dpp will success
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
+                        + "  id BIGINT,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+        TableConfig tableConfig = util.tableEnv().getConfig();
+        // Join reorder don't open.
+        tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false);
+
+        String query =
+                "Select * from fact_part, item, dim"
+                        + " where fact_part.fact_date_sk = dim.dim_date_sk"
+                        + " and fact_part.id = item.id"
+                        + " and dim.id = item.id "
+                        + " and dim.price < 500 and dim.price > 300";
+        util.verifyRelPlan(query);
+    }
+
+    @Test
+    public void testDppWithSubQuery() {
+        // Dpp will success
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
+                        + "  id BIGINT,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+        TableConfig tableConfig = util.tableEnv().getConfig();
+        // Join reorder don't open.
+        tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false);
+
+        String query =
+                "Select * from fact_part, item, dim"
+                        + " where fact_part.id = item.id"
+                        + " and dim.price in (select price from dim where amount = (select amount from dim where amount = 2000))"
+                        + " and fact_part.fact_date_sk = dim.dim_date_sk";
+        util.verifyRelPlan(query);
+    }
+
+    @Test
+    public void testDppWithUnionInFactSide() {
+        // Dpp will success.
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
+                        + "  id BIGINT,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+
+        String query =
+                "Select * from (select id, fact_date_sk, amount + 1 as amount1 from fact_part where price = 1 union all "
+                        + "select id, fact_date_sk, amount + 1 from fact_part where price = 2) fact_part2, item, dim"
+                        + " where fact_part2.fact_date_sk = dim.dim_date_sk"
+                        + " and fact_part2.id = item.id"
+                        + " and dim.price < 500 and dim.price > 300";
+        util.verifyRelPlan(query);
+    }
+
+    @Test
+    public void testDppWithAggInFactSideAndJoinKeyInGrouping() {
+        // Dpp will success
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
+                        + "  id BIGINT,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+
+        String query =
+                "Select * from (Select fact_date_sk, item.amount, sum(fact_part.price) from fact_part "
+                        + "join item on fact_part.id = item.id group by fact_date_sk, item.amount) t1 "
+                        + "join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ";
+        util.verifyRelPlan(query);
+    }
+
+    @Test
+    public void testDppWithAggInFactSideAndJoinKeyInGroupFunction() {
+        // Dpp will not success because join key in group function.
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
+                        + "  id BIGINT,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+
+        String query =
+                "Select * from (Select fact_part.id, item.amount, fact_part.name, sum(fact_part.price), sum(item.price), sum(fact_date_sk) as fact_date_sk1 "
+                        + "from fact_part join item on fact_part.id = item.id "
+                        + "group by fact_part.id, fact_part.name, item.amount) t1 "
+                        + "join dim on t1.fact_date_sk1 = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ";
+        util.verifyRelPlan(query);
+    }
+
+    @Test
+    public void testDppWithAggInFactSideWithAggPushDownEnable() {
+        // Dpp will not success while fact side source support agg push down and source agg push
+        // down enabled is true.
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
+                        + "  id BIGINT,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+
+        String query =
+                "Select * from (Select id, amount, fact_date_sk, count(name), sum(price) "
+                        + "from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 "
+                        + "join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ";
+        util.verifyRelPlan(query);
+    }
+
+    @Test
+    public void testDppWithAggInFactSideWithAggPushDownDisable() {
+        // Dpp will success while fact side source support agg push down but source agg push down
+        // enabled is false.
+        TableConfig tableConfig = util.tableEnv().getConfig();
+        // Disable source agg push down.
+        tableConfig.set(
+                OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, false);
+
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
+                        + "  id BIGINT,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+
+        String query =
+                "Select * from (Select id, amount, fact_date_sk, count(name), sum(price) "
+                        + "from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 "
+                        + "join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ";
+        util.verifyRelPlan(query);
+    }
 
     @Test
-    public void testDPPFactorToReorderTableWithoutStats() {
-        // While there are several joins, and fact table not adjacent to dim table directly. dynamic
-        // partition pruning factor will try best to reorder join relations to make fact table
-        // adjacent to dim table.
+    public void testDPPWithJoinReorderTableWithoutStats() {
+        // Dpp will success.
         String ddl =
                 "CREATE TABLE test_database.item (\n"
                         + "  id BIGINT,\n"
@@ -465,7 +627,7 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
     }
 
     @Test
-    public void testDPPFactorToReorderTableWithStats() throws TableNotExistException {
+    public void testDPPWithJoinReorderTableWithStats() throws TableNotExistException {
         String ddl =
                 "CREATE TABLE test_database.item (\n"
                         + "  id BIGINT,\n"
@@ -518,8 +680,8 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
     }
 
     @Test
-    public void testDPPFactorWithFactSideJoinKeyChanged() {
-        // If partition keys changed in fact side. DPP factor will not work.
+    public void testDPPWithFactSideJoinKeyChanged() {
+        // If partition keys changed in fact side. DPP factor will not success.
         String ddl =
                 "CREATE TABLE test_database.item (\n"
                         + "  id BIGINT,\n"
@@ -543,8 +705,8 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
     }
 
     @Test
-    public void testDPPFactorWithDimSideJoinKeyChanged() {
-        // Although partition keys changed in dim side. DPP factor will work.
+    public void testDPPWithDimSideJoinKeyChanged() {
+        // Although partition keys changed in dim side. DPP will success.
         String ddl =
                 "CREATE TABLE test_database.item (\n"
                         + "  id BIGINT,\n"
@@ -568,9 +730,9 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
     }
 
     @Test
-    public void testDPPFactorWithJoinKeysNotIncludePartitionKeys() {
-        // If join keys of partition table join with dim table not include partition keys, dpp
-        // factor will not be adjusted and dpp will not succeed.
+    public void testDPPWithJoinKeysNotIncludePartitionKeys() {
+        // If join keys of partition table join with dim table not include partition keys, dpp will
+        // not success.
         String ddl =
                 "CREATE TABLE test_database.item (\n"
                         + "  id BIGINT,\n"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml
similarity index 72%
rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml
rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml
index b9d9dfc726b..5ff1bafdb97 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml
@@ -91,7 +91,13 @@ LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], d
       <![CDATA[
 HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, name, amount, price, fact_date_sk, dim_date_sk, EXPR$1], build=[right])
 :- Exchange(distribution=[hash[fact_date_sk]])
-:  +- TableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+:  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+:     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+:        +- HashAggregate(isMerge=[true], groupBy=[dim_date_sk], select=[dim_date_sk, Final_SUM(sum$0) AS EXPR$1])
+:           +- Exchange(distribution=[hash[dim_date_sk]])
+:              +- LocalHashAggregate(groupBy=[dim_date_sk], select=[dim_date_sk, Partial_SUM(price) AS sum$0])
+:                 +- Calc(select=[price, dim_date_sk], where=[<(price, 500)])
+:                    +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])
 +- HashAggregate(isMerge=[true], groupBy=[dim_date_sk], select=[dim_date_sk, Final_SUM(sum$0) AS EXPR$1])
    +- Exchange(distribution=[hash[dim_date_sk]])
       +- LocalHashAggregate(groupBy=[dim_date_sk], select=[dim_date_sk, Partial_SUM(price) AS sum$0])
@@ -237,72 +243,144 @@ HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDPPFactorToReorderTableWithoutStats">
+  <TestCase name="testDppWithAggInFactSideAndJoinKeyInGroupFunction">
     <Resource name="sql">
-      <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
+      <![CDATA[Select * from (Select fact_part.id, item.amount, fact_part.name, sum(fact_part.price), sum(item.price), sum(fact_date_sk) as fact_date_sk1 from fact_part join item on fact_part.id = item.id group by fact_part.id, fact_part.name, item.amount) t1 join dim on t1.fact_date_sk1 = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
-+- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-      :- LogicalJoin(condition=[true], joinType=[inner])
-      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
-      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+LogicalProject(id=[$0], amount=[$1], name=[$2], EXPR$3=[$3], EXPR$4=[$4], fact_date_sk1=[$5], id0=[$6], male=[$7], amount0=[$8], price=[$9], dim_date_sk=[$10])
++- LogicalFilter(condition=[AND(<($9, 500), >($9, 300))])
+   +- LogicalJoin(condition=[=($5, $10)], joinType=[inner])
+      :- LogicalProject(id=[$0], amount=[$2], name=[$1], EXPR$3=[$3], EXPR$4=[$4], fact_date_sk1=[$5])
+      :  +- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)], EXPR$4=[SUM($4)], fact_date_sk1=[SUM($5)])
+      :     +- LogicalProject(id=[$0], name=[$1], amount=[$6], price=[$3], price0=[$7], fact_date_sk=[$4])
+      :        +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :           :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :           +- LogicalTableScan(table=[[testCatalog, test_database, item]])
       +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
-+- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
-   :- Exchange(distribution=[hash[id]])
-   :  +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
-   +- HashJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
-      :- Exchange(distribution=[hash[id]])
-      :  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-      :     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-      :        +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
-      :           +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
-      +- Exchange(distribution=[hash[id]])
-         +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
-            +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk1, dim_date_sk)], select=[id, amount, name, EXPR$3, EXPR$4, fact_date_sk1, id0, male, amount0, price, dim_date_sk], build=[left])
+:- Exchange(distribution=[hash[fact_date_sk1]])
+:  +- Calc(select=[id, amount, name, EXPR$3, EXPR$4, fact_date_sk1])
+:     +- HashAggregate(isMerge=[false], groupBy=[id, name, amount], select=[id, name, amount, SUM(price) AS EXPR$3, SUM(price0) AS EXPR$4, SUM(fact_date_sk) AS fact_date_sk1])
+:        +- Calc(select=[id, name, amount, price, price0, fact_date_sk])
+:           +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, price, fact_date_sk, id0, amount, price0], build=[right])
+:              :- Exchange(distribution=[hash[id]])
+:              :  +- TableSourceScan(table=[[testCatalog, test_database, fact_part, project=[id, name, price, fact_date_sk], metadata=[]]], fields=[id, name, price, fact_date_sk])
+:              +- Exchange(distribution=[hash[id]])
+:                 +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDPPFactorToReorderTableWithStats">
+  <TestCase name="testDppWithAggInFactSideAndJoinKeyInGrouping">
     <Resource name="sql">
-      <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
+      <![CDATA[Select * from (Select fact_date_sk, item.amount, sum(fact_part.price) from fact_part join item on fact_part.id = item.id group by fact_date_sk, item.amount) t1 join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
-+- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-      :- LogicalJoin(condition=[true], joinType=[inner])
-      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
-      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+LogicalProject(fact_date_sk=[$0], amount=[$1], EXPR$2=[$2], id=[$3], male=[$4], amount0=[$5], price=[$6], dim_date_sk=[$7])
++- LogicalFilter(condition=[AND(<($6, 500), >($6, 300))])
+   +- LogicalJoin(condition=[=($0, $7)], joinType=[inner])
+      :- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
+      :  +- LogicalProject(fact_date_sk=[$4], amount=[$6], price=[$3])
+      :     +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :        :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :        +- LogicalTableScan(table=[[testCatalog, test_database, item]])
       +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
-+- NestedLoopJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
-   :- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
-   +- Exchange(distribution=[broadcast])
-      +- NestedLoopJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
-         :- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-         :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-         :     +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
-         :        +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
-         +- Exchange(distribution=[broadcast])
-            +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
-               +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[fact_date_sk, amount, EXPR$2, id, male, amount0, price, dim_date_sk], build=[left])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- HashAggregate(isMerge=[true], groupBy=[fact_date_sk, amount], select=[fact_date_sk, amount, Final_SUM(sum$0) AS EXPR$2])
+:     +- Exchange(distribution=[hash[fact_date_sk, amount]])
+:        +- LocalHashAggregate(groupBy=[fact_date_sk, amount], select=[fact_date_sk, amount, Partial_SUM(price) AS sum$0])
+:           +- Calc(select=[fact_date_sk, amount, price])
+:              +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, price, fact_date_sk, id0, amount], build=[right])
+:                 :- Exchange(distribution=[hash[id]])
+:                 :  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[id, price, fact_date_sk], metadata=[]]], fields=[id, price, fact_date_sk])
+:                 :     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+:                 :        +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+:                 :           +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:                 +- Exchange(distribution=[hash[id]])
+:                    +- TableSourceScan(table=[[testCatalog, test_database, item, project=[id, amount], metadata=[]]], fields=[id, amount])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDppWithAggInFactSideWithAggPushDownDisable">
+    <Resource name="sql">
+      <![CDATA[Select * from (Select id, amount, fact_date_sk, count(name), sum(price) from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], amount=[$1], fact_date_sk=[$2], EXPR$3=[$3], EXPR$4=[$4], id0=[$5], male=[$6], amount0=[$7], price=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(<($8, 500), >($8, 300))])
+   +- LogicalJoin(condition=[=($2, $9)], joinType=[inner])
+      :- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)], EXPR$4=[SUM($4)])
+      :  +- LogicalProject(id=[$0], amount=[$2], fact_date_sk=[$4], name=[$1], price=[$3])
+      :     +- LogicalFilter(condition=[>($4, 100)])
+      :        +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, amount, fact_date_sk, EXPR$3, EXPR$4, id0, male, amount0, price, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- HashAggregate(isMerge=[true], groupBy=[id, amount, fact_date_sk], select=[id, amount, fact_date_sk, Final_COUNT(count$0) AS EXPR$3, Final_SUM(sum$1) AS EXPR$4])
+:     +- Exchange(distribution=[hash[id, amount, fact_date_sk]])
+:        +- LocalHashAggregate(groupBy=[id, amount, fact_date_sk], select=[id, amount, fact_date_sk, Partial_COUNT(name) AS count$0, Partial_SUM(price) AS sum$1])
+:           +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, partitions=[{fact_date_sk=1990}, {fact_date_sk=1991}, {fact_date_sk=1992}]]], fields=[id, name, amount, price, fact_date_sk])
+:              +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+:                 +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+:                    +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDppWithAggInFactSideWithAggPushDownEnable">
+    <Resource name="sql">
+      <![CDATA[Select * from (Select id, amount, fact_date_sk, count(name), sum(price) from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], amount=[$1], fact_date_sk=[$2], EXPR$3=[$3], EXPR$4=[$4], id0=[$5], male=[$6], amount0=[$7], price=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(<($8, 500), >($8, 300))])
+   +- LogicalJoin(condition=[=($2, $9)], joinType=[inner])
+      :- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)], EXPR$4=[SUM($4)])
+      :  +- LogicalProject(id=[$0], amount=[$2], fact_date_sk=[$4], name=[$1], price=[$3])
+      :     +- LogicalFilter(condition=[>($4, 100)])
+      :        +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, amount, fact_date_sk, EXPR$3, EXPR$4, id0, male, amount0, price, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- HashAggregate(isMerge=[true], groupBy=[id, amount, fact_date_sk], select=[id, amount, fact_date_sk, Final_COUNT(count$0) AS EXPR$3, Final_SUM(sum$1) AS EXPR$4])
+:     +- Exchange(distribution=[hash[id, amount, fact_date_sk]])
+:        +- TableSourceScan(table=[[testCatalog, test_database, fact_part, partitions=[{fact_date_sk=1990}, {fact_date_sk=1991}, {fact_date_sk=1992}], aggregates=[grouping=[id,amount,fact_date_sk], aggFunctions=[CountAggFunction(name),LongSumAggFunction(price)]]]], fields=[id, amount, fact_date_sk, count$0, sum$1])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDPPFactorWithDimSideJoinKeyChanged">
+  <TestCase name="testDPPWithDimSideJoinKeyChanged">
     <Resource name="sql">
       <![CDATA[Select * from fact_part join item on fact_part.id = item.id join (select dim_date_sk + 1 as dim_date_sk, price from dim) dim1 on fact_part.fact_date_sk = dim1.dim_date_sk where dim1.price < 500 and dim1.price > 300]]>
     </Resource>
@@ -337,7 +415,7 @@ Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDPPFactorWithFactSideJoinKeyChanged">
+  <TestCase name="testDPPWithFactSideJoinKeyChanged">
     <Resource name="sql">
       <![CDATA[Select * from (select fact_date_sk + 1 as fact_date_sk, id from fact_part) fact_part1 join item on fact_part1.id = item.id join dim on fact_part1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300]]>
     </Resource>
@@ -370,7 +448,7 @@ Calc(select=[fact_date_sk, id0 AS id, id AS id0, amount, price, id00 AS id1, mal
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDPPFactorWithJoinKeysNotIncludePartitionKeys">
+  <TestCase name="testDPPWithJoinKeysNotIncludePartitionKeys">
     <Resource name="sql">
       <![CDATA[Select * from fact_part, item, dim where fact_part.id = dim.id and fact_part.id = item.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
     </Resource>
@@ -396,6 +474,213 @@ HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, pri
    +- Exchange(distribution=[hash[id]])
       +- Calc(select=[id, male, amount, price, dim_date_sk], where=[AND(SEARCH(price, Sarg[(300..500)]), IS NOT NULL(id))])
          +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDPPWithJoinReorderTableWithoutStats">
+    <Resource name="sql">
+      <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
++- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
+   :- Exchange(distribution=[hash[id]])
+   :  +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
+   +- HashJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
+      :- Exchange(distribution=[hash[id]])
+      :  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+      :     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+      :        +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+      :           +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+      +- Exchange(distribution=[hash[id]])
+         +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+            +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDPPWithJoinReorderTableWithStats">
+    <Resource name="sql">
+      <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
++- NestedLoopJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
+   :- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
+   +- Exchange(distribution=[broadcast])
+      +- NestedLoopJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
+         :- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+         :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+         :     +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+         :        +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+         +- Exchange(distribution=[broadcast])
+            +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+               +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDppWithoutJoinReorder">
+    <Resource name="sql">
+      <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id1, id0))], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0, id1, male, amount1, price1, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk, id0]])
+:  +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0], build=[right])
+:     :- Exchange(distribution=[hash[id]])
+:     :  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+:     :     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+:     :        +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+:     :           +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     +- Exchange(distribution=[hash[id]])
+:        +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
++- Exchange(distribution=[hash[dim_date_sk, id]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDppWithSubQuery">
+    <Resource name="sql">
+      <![CDATA[Select * from fact_part, item, dim where fact_part.id = item.id and dim.price in (select price from dim where amount = (select amount from dim where amount = 2000)) and fact_part.fact_date_sk = dim.dim_date_sk]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($0, $5), IN($11, {
+LogicalProject(price=[$3])
+  LogicalFilter(condition=[=($2, $SCALAR_QUERY({
+LogicalProject(amount=[$2])
+  LogicalFilter(condition=[=($2, 2000)])
+    LogicalTableScan(table=[[testCatalog, test_database, dim]])
+}))])
+    LogicalTableScan(table=[[testCatalog, test_database, dim]])
+}), =($4, $12))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0, id1, male, amount1, price1, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0], build=[right])
+:     :- Exchange(distribution=[hash[id]])
+:     :  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+:     :     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+:     :        +- HashJoin(joinType=[LeftSemiJoin], where=[=(price, price0)], select=[id, male, amount, price, dim_date_sk], build=[right])
+:     :           :- Exchange(distribution=[hash[price]])
+:     :           :  +- TableSourceScan(table=[[testCatalog, test_database, dim]], fields=[id, male, amount, price, dim_date_sk])
+:     :           +- Exchange(distribution=[hash[price]])
+:     :              +- Calc(select=[price])
+:     :                 +- NestedLoopJoin(joinType=[InnerJoin], where=[=(amount, $f0)], select=[amount, price, $f0], build=[right], singleRowJoin=[true])
+:     :                    :- TableSourceScan(table=[[testCatalog, test_database, dim, project=[amount, price], metadata=[]]], fields=[amount, price])
+:     :                    +- Exchange(distribution=[broadcast])
+:     :                       +- HashAggregate(isMerge=[true], select=[Final_SINGLE_VALUE(value$0, count$1) AS $f0])
+:     :                          +- Exchange(distribution=[single])
+:     :                             +- LocalHashAggregate(select=[Partial_SINGLE_VALUE(amount) AS (value$0, count$1)])
+:     :                                +- Calc(select=[CAST(2000 AS BIGINT) AS amount], where=[=(amount, 2000)])
+:     :                                   +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], project=[amount], metadata=[]]], fields=[amount])
+:     +- Exchange(distribution=[hash[id]])
+:        +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- HashJoin(joinType=[LeftSemiJoin], where=[=(price, price0)], select=[id, male, amount, price, dim_date_sk], build=[right])
+      :- Exchange(distribution=[hash[price]])
+      :  +- TableSourceScan(table=[[testCatalog, test_database, dim]], fields=[id, male, amount, price, dim_date_sk])
+      +- Exchange(distribution=[hash[price]])
+         +- Calc(select=[price])
+            +- NestedLoopJoin(joinType=[InnerJoin], where=[=(amount, $f0)], select=[amount, price, $f0], build=[right], singleRowJoin=[true])
+               :- TableSourceScan(table=[[testCatalog, test_database, dim, project=[amount, price], metadata=[]]], fields=[amount, price])
+               +- Exchange(distribution=[broadcast])
+                  +- HashAggregate(isMerge=[true], select=[Final_SINGLE_VALUE(value$0, count$1) AS $f0])
+                     +- Exchange(distribution=[single])
+                        +- LocalHashAggregate(select=[Partial_SINGLE_VALUE(amount) AS (value$0, count$1)])
+                           +- Calc(select=[CAST(2000 AS BIGINT) AS amount], where=[=(amount, 2000)])
+                              +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], project=[amount], metadata=[]]], fields=[amount])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDppWithUnionInFactSide">
+    <Resource name="sql">
+      <![CDATA[Select * from (select id, fact_date_sk, amount + 1 as amount1 from fact_part where price = 1 union all select id, fact_date_sk, amount + 1 from fact_part where price = 2) fact_part2, item, dim where fact_part2.fact_date_sk = dim.dim_date_sk and fact_part2.id = item.id and dim.price < 500 and dim.price > 300]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], fact_date_sk=[$1], amount1=[$2], id0=[$3], amount=[$4], price=[$5], id1=[$6], male=[$7], amount0=[$8], price0=[$9], dim_date_sk=[$10])
++- LogicalFilter(condition=[AND(=($1, $10), =($0, $3), <($9, 500), >($9, 300))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalUnion(all=[true])
+      :  :  :- LogicalProject(id=[$0], fact_date_sk=[$4], amount1=[+($2, 1)])
+      :  :  :  +- LogicalFilter(condition=[=($3, 1)])
+      :  :  :     +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  :  +- LogicalProject(id=[$0], fact_date_sk=[$4], EXPR$2=[+($2, 1)])
+      :  :     +- LogicalFilter(condition=[=($3, 2)])
+      :  :        +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, fact_date_sk, amount1, id0, amount, price, id1, male, amount0, price0, dim_date_sk], build=[left])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, fact_date_sk, amount1, id0, amount, price], build=[left])
+:     :- Exchange(distribution=[hash[id]])
+:     :  +- Union(all=[true], union=[id, fact_date_sk, amount1])
+:     :     :- Calc(select=[id, fact_date_sk, +(amount, 1) AS amount1], where=[=(price, 1)])
+:     :     :  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[id, amount, price, fact_date_sk], metadata=[]]], fields=[id, amount, price, fact_date_sk])
+:     :     :     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+:     :     :        +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+:     :     :           +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     :     +- Calc(select=[id, fact_date_sk, +(amount, 1) AS EXPR$2], where=[=(price, 2)])
+:     :        +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[id, amount, price, fact_date_sk], metadata=[]]], fields=[id, amount, price, fact_date_sk])
+:     :           +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+:     :              +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+:     :                 +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     +- Exchange(distribution=[hash[id]])
+:        +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
 ]]>
     </Resource>
   </TestCase>