You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/24 02:18:19 UTC

[flink] branch master updated (ee4d27411b3 -> 1ed1deb351b)

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from ee4d27411b3 [FLINK-28977] NullPointerException in HybridSourceSplitEnumerator.close (#20587)
     new 1a0f591a59b [FLINK-28993][table-planner] Refactor SupportsDynamicFiltering to provide getAcceptedFilterFields method to avoid modifing the DynamicTableSource object if calling applyDynamicFiltering method in join reorder rules
     new 1ed1deb351b [FLINK-28993][table-planner] Fix adjusting join cost for dpp query pattern error

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/connectors/hive/HiveTableSource.java     |  66 +++-----
 .../source/abilities/SupportsDynamicFiltering.java |  18 ++-
 .../batch/DynamicPartitionPruningRule.java         |  52 +------
 .../utils/DynamicPartitionPruningUtils.java        |  30 +++-
 .../planner/plan/metadata/FlinkRelMdRowCount.scala |   4 +-
 .../planner/factories/TestValuesTableFactory.java  |  21 +--
 .../batch/DynamicPartitionPruningRuleTest.java     | 133 ++++++++++++++--
 .../batch/DynamicPartitionPruningRuleTest.xml      | 171 ++++++++++++++++-----
 8 files changed, 330 insertions(+), 165 deletions(-)


[flink] 01/02: [FLINK-28993][table-planner] Refactor SupportsDynamicFiltering to provide getAcceptedFilterFields method to avoid modifing the DynamicTableSource object if calling applyDynamicFiltering method in join reorder rules

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a0f591a59b59f0c6ce71f5af9e0660293c33fc1
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Tue Aug 23 11:52:53 2022 +0800

    [FLINK-28993][table-planner] Refactor SupportsDynamicFiltering to provide getAcceptedFilterFields method to avoid modifing the DynamicTableSource object if calling applyDynamicFiltering method in join reorder rules
    
    This closes #20596
---
 .../flink/connectors/hive/HiveTableSource.java     | 66 ++++++++--------------
 .../source/abilities/SupportsDynamicFiltering.java | 18 ++++--
 .../batch/DynamicPartitionPruningRule.java         | 52 +++--------------
 .../utils/DynamicPartitionPruningUtils.java        | 21 +++++++
 .../planner/factories/TestValuesTableFactory.java  | 21 ++-----
 5 files changed, 74 insertions(+), 104 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 2570c50b702..edb752ab7e0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -40,6 +40,7 @@ import org.apache.flink.orc.util.OrcFormatStatisticsReportUtil;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -76,7 +77,6 @@ import javax.annotation.Nullable;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -86,8 +86,6 @@ import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.P
 import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
 import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_ENABLE;
 import static org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /** A TableSource implementation to read data from Hive tables. */
 public class HiveTableSource
@@ -252,47 +250,33 @@ public class HiveTableSource
     }
 
     @Override
-    public List<String> applyDynamicFiltering(List<String> candidateFilterFields) {
-        if (catalogTable.getPartitionKeys() != null
-                && catalogTable.getPartitionKeys().size() != 0) {
-            checkArgument(
-                    !candidateFilterFields.isEmpty(),
-                    "At least one field should be provided for dynamic filtering");
-
-            // only accept partition fields of supported types to do dynamic partition pruning
-            List<String> dynamicFilterPartitionKeys = new ArrayList<>();
-            for (String field : candidateFilterFields) {
-                if (catalogTable.getPartitionKeys().contains(field)
-                        && HiveSourceDynamicFileEnumerator.SUPPORTED_TYPES.contains(
-                                catalogTable
-                                        .getSchema()
-                                        .getFieldDataType(field)
-                                        .map(DataType::getLogicalType)
-                                        .map(LogicalType::getTypeRoot)
-                                        .orElse(null))) {
-                    dynamicFilterPartitionKeys.add(field);
-                }
-            }
-            if (dynamicFilterPartitionKeys.isEmpty()) {
-                LOG.warn(
-                        "No dynamic filter field is accepted,"
-                                + " only partition fields can use for dynamic filtering.");
+    public List<String> listAcceptedFilterFields() {
+        List<String> acceptedFilterFields = new ArrayList<>();
+        for (String partitionKey : catalogTable.getPartitionKeys()) {
+            // Only partition keys with supported types can be returned as accepted filter fields.
+            if (HiveSourceDynamicFileEnumerator.SUPPORTED_TYPES.contains(
+                    catalogTable
+                            .getSchema()
+                            .getFieldDataType(partitionKey)
+                            .map(DataType::getLogicalType)
+                            .map(LogicalType::getTypeRoot)
+                            .orElse(null))) {
+                acceptedFilterFields.add(partitionKey);
             }
+        }
+
+        return acceptedFilterFields;
+    }
 
-            // sort before check to ensure the lists have same elements in same order
-            dynamicFilterPartitionKeys.sort(String::compareTo);
-            checkState(
-                    this.dynamicFilterPartitionKeys == null
-                            || this.dynamicFilterPartitionKeys.equals(dynamicFilterPartitionKeys),
-                    "Dynamic filtering is applied twice but with different keys: %s != %s",
-                    this.dynamicFilterPartitionKeys,
-                    dynamicFilterPartitionKeys);
-
-            this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
-            return dynamicFilterPartitionKeys;
+    @Override
+    public void applyDynamicFiltering(List<String> candidateFilterFields) {
+        if (catalogTable.isPartitioned()) {
+            dynamicFilterPartitionKeys = candidateFilterFields;
         } else {
-            LOG.warn("No dynamic filter field is accepted since the table is non-partitioned.");
-            return Collections.emptyList();
+            throw new TableException(
+                    String.format(
+                            "Hive source table : %s is not a partition table, but try to apply dynamic filtering.",
+                            catalogTable));
         }
     }
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java
index ca84a9c90df..01a3fa5daf9 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java
@@ -60,9 +60,19 @@ import java.util.List;
 public interface SupportsDynamicFiltering {
 
     /**
-     * Applies the candidate filter fields into the table source, and return the accepted fields.
-     * The data corresponding the filter fields will be provided in runtime, which can be used to
-     * filter the partitions or the input data.
+     * Return the filter fields this partition table source supported. This method is can tell the
+     * planner which fields can be used as dynamic filtering fields, the planner will pick some
+     * fields from the returned fields based on the query, and create dynamic filtering operator.
      */
-    List<String> applyDynamicFiltering(List<String> candidateFilterFields);
+    List<String> listAcceptedFilterFields();
+
+    /**
+     * Applies the candidate filter fields into the table source. The data corresponding the filter
+     * fields will be provided in runtime, which can be used to filter the partitions or the input
+     * data.
+     *
+     * <p>NOTE: the candidate filter fields are always from the result of {@link
+     * #listAcceptedFilterFields()}.
+     */
+    void applyDynamicFiltering(List<String> candidateFilterFields);
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
index c56cca21ef9..b0767d27f05 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.plan.rules.physical.batch;
 
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
@@ -139,29 +138,19 @@ public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config
                             .collect(Collectors.toList());
         }
 
-        List<String> acceptedFields =
-                ((SupportsDynamicFiltering) tableSource).applyDynamicFiltering(candidateFields);
-
-        if (acceptedFields == null) {
-            return new ArrayList<>();
-        }
-
-        for (String field : acceptedFields) {
-            if (!candidateFields.contains(field)) {
-                throw new TableException(
-                        String.format(
-                                "Field: %s does not exist in the given fields: %s, "
-                                        + "please verify the applyDynamicFiltering method in connector: %s",
-                                field, candidateFields, tableSource.asSummaryString()));
-            }
-        }
+        List<String> acceptedFilterFields =
+                DynamicPartitionPruningUtils.getSuitableDynamicFilteringFieldsInFactSide(
+                        tableSource, candidateFields);
+        // Apply suitable accepted filter fields to source.
+        ((SupportsDynamicFiltering) tableSource).applyDynamicFiltering(acceptedFilterFields);
 
         if (factCalc == null) {
-            return acceptedFields.stream()
+            return acceptedFilterFields.stream()
                     .map(f -> factScan.getRowType().getFieldNames().indexOf(f))
                     .collect(Collectors.toList());
         } else {
-            return getAcceptedFieldsIndicesInCalc(acceptedFields, factJoinKeys, factCalc, factScan);
+            return getAcceptedFieldsIndicesInCalc(
+                    acceptedFilterFields, factJoinKeys, factCalc, factScan);
         }
     }
 
@@ -203,10 +192,6 @@ public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config
         List<Integer> acceptedFieldIndices =
                 getAcceptedFieldIndices(factJoinKeys, factCalc, factScan, tableSource);
 
-        if (acceptedFieldIndices.isEmpty()) {
-            return null;
-        }
-
         List<Integer> dynamicFilteringFieldIndices = new ArrayList<>();
         for (int i = 0; i < joinInfo.leftKeys.size(); ++i) {
             if (acceptedFieldIndices.contains(factJoinKeys.get(i))) {
@@ -293,9 +278,6 @@ public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config
 
             final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
                     createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, false);
-            if (newFactScan == null) {
-                return;
-            }
             final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newFactScan));
             call.transformTo(newJoin);
         }
@@ -348,9 +330,6 @@ public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config
 
             final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
                     createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, true);
-            if (newFactScan == null) {
-                return;
-            }
             final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newFactScan, dimSide));
             call.transformTo(newJoin);
         }
@@ -409,9 +388,6 @@ public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config
 
             final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
                     createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, false);
-            if (newFactScan == null) {
-                return;
-            }
             final BatchPhysicalExchange newExchange =
                     (BatchPhysicalExchange)
                             exchange.copy(
@@ -474,9 +450,6 @@ public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config
 
             final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
                     createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, true);
-            if (newFactScan == null) {
-                return;
-            }
             final BatchPhysicalExchange newExchange =
                     (BatchPhysicalExchange)
                             exchange.copy(
@@ -601,9 +574,6 @@ public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config
 
             final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
                     createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, true);
-            if (newFactScan == null) {
-                return;
-            }
             final BatchPhysicalCalc newCalc =
                     (BatchPhysicalCalc)
                             factCalc.copy(
@@ -677,9 +647,6 @@ public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config
 
             final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
                     createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, false);
-            if (newFactScan == null) {
-                return;
-            }
             final BatchPhysicalCalc newCalc =
                     (BatchPhysicalCalc)
                             factCalc.copy(
@@ -757,9 +724,6 @@ public abstract class DynamicPartitionPruningRule extends RelRule<RelRule.Config
 
             final BatchPhysicalDynamicFilteringTableSourceScan newFactScan =
                     createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, true);
-            if (newFactScan == null) {
-                return;
-            }
             final BatchPhysicalCalc newCalc =
                     (BatchPhysicalCalc)
                             factCalc.copy(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
index c4167348ec6..ffad5054d5d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
@@ -202,6 +202,27 @@ public class DynamicPartitionPruningUtils {
         }
     }
 
+    public static List<String> getSuitableDynamicFilteringFieldsInFactSide(
+            DynamicTableSource tableSource, List<String> candidateFields) {
+        List<String> acceptedFilterFields =
+                ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields();
+        if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
+            return new ArrayList<>();
+        }
+
+        List<String> suitableFields = new ArrayList<>();
+        // If candidateField not in acceptedFilterFields means dpp rule will not be matched,
+        // because we can not prune any partitions according to non-accepted filter fields
+        // provided by partition table source.
+        for (String candidateField : candidateFields) {
+            if (acceptedFilterFields.contains(candidateField)) {
+                suitableFields.add(candidateField);
+            }
+        }
+
+        return suitableFields;
+    }
+
     /**
      * Visit dim side to judge whether dim side has filter condition and whether dim side's source
      * table scan is non partitioned scan.
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 949b9f57e57..5254750a5f0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -1302,22 +1302,13 @@ public final class TestValuesTableFactory
         }
 
         @Override
-        public List<String> applyDynamicFiltering(List<String> candidateFilterFields) {
-            if (dynamicFilteringFields != null && dynamicFilteringFields.size() != 0) {
-                checkArgument(!candidateFilterFields.isEmpty());
-                acceptedPartitionFilterFields = new ArrayList<>();
-                for (String field : candidateFilterFields) {
-                    if (dynamicFilteringFields.contains(field)) {
-                        acceptedPartitionFilterFields.add(field);
-                    }
-                }
+        public List<String> listAcceptedFilterFields() {
+            return new ArrayList<>(dynamicFilteringFields);
+        }
 
-                return new ArrayList<>(acceptedPartitionFilterFields);
-            } else {
-                throw new UnsupportedOperationException(
-                        "Should adding dynamic filtering fields by adding factor"
-                                + " in with like: 'dynamic-filtering-fields' = 'a;b'.");
-            }
+        @Override
+        public void applyDynamicFiltering(List<String> candidateFilterFields) {
+            acceptedPartitionFilterFields = candidateFilterFields;
         }
     }
 


[flink] 02/02: [FLINK-28993][table-planner] Fix adjusting join cost for dpp query pattern error

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ed1deb351bd7a9d4fbe46dd9e9cf40b08b97fd9
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Tue Aug 23 11:55:39 2022 +0800

    [FLINK-28993][table-planner] Fix adjusting join cost for dpp query pattern error
    
    This closes #20596
---
 .../utils/DynamicPartitionPruningUtils.java        |   9 +-
 .../planner/plan/metadata/FlinkRelMdRowCount.scala |   4 +-
 .../batch/DynamicPartitionPruningRuleTest.java     | 133 ++++++++++++++--
 .../batch/DynamicPartitionPruningRuleTest.xml      | 171 ++++++++++++++++-----
 4 files changed, 256 insertions(+), 61 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
index ffad5054d5d..d569772b2e1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java
@@ -171,7 +171,14 @@ public class DynamicPartitionPruningUtils {
                     joinKeys.stream()
                             .map(i -> scan.getRowType().getFieldNames().get(i))
                             .collect(Collectors.toList());
-            factSideFactors.isSuitableFactScanSource = !candidateFields.isEmpty();
+            if (candidateFields.isEmpty()) {
+                factSideFactors.isSuitableFactScanSource = false;
+                return;
+            }
+
+            factSideFactors.isSuitableFactScanSource =
+                    !getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields)
+                            .isEmpty();
         } else if (rel instanceof HepRelVertex) {
             visitFactSide(((HepRelVertex) rel).getCurrentRel(), factSideFactors, joinKeys);
         } else if (rel instanceof Exchange || rel instanceof Filter) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
index cfb3ee08cdd..b20868b1835 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
@@ -369,14 +369,14 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun
       } else {
         leftRowCount * selectivityOfNonEquiPred
       }
-      return outputRowCount
+      return outputRowCount * dynamicPartitionPruningFactor
     }
 
     // if joinCondition has no ndv stats and no uniqueKeys stats,
     // rowCount = (leftRowCount + rightRowCount) * join condition selectivity
     val crossJoin = copyJoinWithNewCondition(join, rexBuilder.makeLiteral(true))
     val selectivity = fmq.getSelectivity(crossJoin, condition)
-    (leftRowCount + rightRowCount) * selectivity
+    (leftRowCount + rightRowCount) * selectivity * dynamicPartitionPruningFactor
   }
 
   private def copyJoinWithNewCondition(join: Join, newCondition: RexNode): Join = {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java
index 51efc661996..0a7f0c7bc9d 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java
@@ -22,6 +22,9 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.planner.factories.TestValuesCatalog;
 import org.apache.flink.table.planner.utils.BatchTableTestUtil;
@@ -30,6 +33,12 @@ import org.apache.flink.table.planner.utils.TableTestBase;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Test for rules that extend {@link DynamicPartitionPruningRule} to create {@link
  * org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan}.
@@ -428,12 +437,12 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
     // --------------------------dpp factor test ---------------------------------------------
 
     @Test
-    public void testDPPFactorToReorderTable() {
+    public void testDPPFactorToReorderTableWithoutStats() {
         // While there are several joins, and fact table not adjacent to dim table directly. dynamic
         // partition pruning factor will try best to reorder join relations to make fact table
         // adjacent to dim table.
-        String ddl1 =
-                "CREATE TABLE test_database.sales (\n"
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
                         + "  id BIGINT,\n"
                         + "  amount BIGINT,\n"
                         + "  price BIGINT\n"
@@ -441,9 +450,23 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
                         + " 'connector' = 'values',\n"
                         + " 'bounded' = 'true'\n"
                         + ")";
-        util.tableEnv().executeSql(ddl1);
+        util.tableEnv().executeSql(ddl);
+        TableConfig tableConfig = util.tableEnv().getConfig();
+        // Join reorder need open.
+        tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true);
+
+        String query =
+                "Select * from fact_part, item, dim"
+                        + " where fact_part.fact_date_sk = dim.dim_date_sk"
+                        + " and fact_part.id = item.id"
+                        + " and dim.id = item.id "
+                        + " and dim.price < 500 and dim.price > 300";
+        util.verifyRelPlan(query);
+    }
 
-        String ddl2 =
+    @Test
+    public void testDPPFactorToReorderTableWithStats() throws TableNotExistException {
+        String ddl =
                 "CREATE TABLE test_database.item (\n"
                         + "  id BIGINT,\n"
                         + "  amount BIGINT,\n"
@@ -452,26 +475,53 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
                         + " 'connector' = 'values',\n"
                         + " 'bounded' = 'true'\n"
                         + ")";
-        util.tableEnv().executeSql(ddl2);
+        util.tableEnv().executeSql(ddl);
         TableConfig tableConfig = util.tableEnv().getConfig();
         // Join reorder need open.
         tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true);
 
+        // Alter table stats and column stats.
+        CatalogTableStatistics tableStatistics = new CatalogTableStatistics(10, 10, 10, 10);
+        catalog.alterTableStatistics(
+                new ObjectPath("test_database", "dim"), tableStatistics, false);
+        catalog.alterTableColumnStatistics(
+                new ObjectPath("test_database", "dim"),
+                createJoinKeyColumnStats(
+                        Arrays.asList("dim_date_sk", "id", "price"), 10L, 1000L, 5L, 10L),
+                false);
+
+        // table item have same stats with table dim, but item not meets dpp pattern.
+        tableStatistics = new CatalogTableStatistics(10, 10, 10, 10);
+        catalog.alterTableStatistics(
+                new ObjectPath("test_database", "item"), tableStatistics, false);
+        catalog.alterTableColumnStatistics(
+                new ObjectPath("test_database", "item"),
+                createJoinKeyColumnStats(Collections.singletonList("id"), 10L, 1000L, 5L, 10L),
+                false);
+
+        tableStatistics = new CatalogTableStatistics(10000, 10000, 10000, 10000);
+        catalog.alterTableStatistics(
+                new ObjectPath("test_database", "fact_part"), tableStatistics, false);
+        catalog.alterTableColumnStatistics(
+                new ObjectPath("test_database", "fact_part"),
+                createJoinKeyColumnStats(
+                        Arrays.asList("fact_date_sk", "id"), 100L, 1000000L, 8500L, 9800L),
+                false);
+
         String query =
-                "Select * from fact_part, item, sales, dim"
+                "Select * from fact_part, item, dim"
                         + " where fact_part.fact_date_sk = dim.dim_date_sk"
                         + " and fact_part.id = item.id"
-                        + " and fact_part.id = sales.id"
                         + " and dim.id = item.id "
                         + " and dim.price < 500 and dim.price > 300";
         util.verifyRelPlan(query);
     }
 
     @Test
-    public void testDPPFactorWithDimSideJoinKeyChanged() {
-        // If partition keys changed in dim side. DPP factor will not works.
-        String ddl1 =
-                "CREATE TABLE test_database.sales (\n"
+    public void testDPPFactorWithFactSideJoinKeyChanged() {
+        // If partition keys changed in fact side. DPP factor will not work.
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
                         + "  id BIGINT,\n"
                         + "  amount BIGINT,\n"
                         + "  price BIGINT\n"
@@ -479,9 +529,23 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
                         + " 'connector' = 'values',\n"
                         + " 'bounded' = 'true'\n"
                         + ")";
-        util.tableEnv().executeSql(ddl1);
+        util.tableEnv().executeSql(ddl);
+        TableConfig tableConfig = util.tableEnv().getConfig();
+        // Join reorder need open.
+        tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true);
 
-        String ddl2 =
+        String query =
+                "Select * from (select fact_date_sk + 1 as fact_date_sk, id from fact_part) fact_part1 join item on "
+                        + "fact_part1.id = item.id"
+                        + " join dim on fact_part1.fact_date_sk = dim.dim_date_sk"
+                        + " where dim.price < 500 and dim.price > 300";
+        util.verifyRelPlan(query);
+    }
+
+    @Test
+    public void testDPPFactorWithDimSideJoinKeyChanged() {
+        // Although partition keys changed in dim side. DPP factor will work.
+        String ddl =
                 "CREATE TABLE test_database.item (\n"
                         + "  id BIGINT,\n"
                         + "  amount BIGINT,\n"
@@ -490,17 +554,54 @@ public class DynamicPartitionPruningRuleTest extends TableTestBase {
                         + " 'connector' = 'values',\n"
                         + " 'bounded' = 'true'\n"
                         + ")";
-        util.tableEnv().executeSql(ddl2);
+        util.tableEnv().executeSql(ddl);
         TableConfig tableConfig = util.tableEnv().getConfig();
         // Join reorder need open.
         tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true);
 
         String query =
                 "Select * from fact_part join item on fact_part.id = item.id"
-                        + " join sales on fact_part.id = sales.id"
                         + " join (select dim_date_sk + 1 as dim_date_sk, price from dim) dim1"
                         + " on fact_part.fact_date_sk = dim1.dim_date_sk"
                         + " where dim1.price < 500 and dim1.price > 300";
         util.verifyRelPlan(query);
     }
+
+    @Test
+    public void testDPPFactorWithJoinKeysNotIncludePartitionKeys() {
+        // If join keys of partition table join with dim table not include partition keys, dpp
+        // factor will not be adjusted and dpp will not succeed.
+        String ddl =
+                "CREATE TABLE test_database.item (\n"
+                        + "  id BIGINT,\n"
+                        + "  amount BIGINT,\n"
+                        + "  price BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'true'\n"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+        TableConfig tableConfig = util.tableEnv().getConfig();
+        // Join reorder need open.
+        tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true);
+
+        String query =
+                "Select * from fact_part, item, dim"
+                        + " where fact_part.id = dim.id"
+                        + " and fact_part.id = item.id"
+                        + " and dim.id = item.id "
+                        + " and dim.price < 500 and dim.price > 300";
+        util.verifyRelPlan(query);
+    }
+
+    private CatalogColumnStatistics createJoinKeyColumnStats(
+            List<String> columnNames, Long min, Long max, Long ndv, Long nullCount) {
+        CatalogColumnStatisticsDataLong longColStats =
+                new CatalogColumnStatisticsDataLong(min, max, ndv, nullCount);
+        Map<String, CatalogColumnStatisticsDataBase> colStatsMap = new HashMap<>(1);
+        for (String columnName : columnNames) {
+            colStatsMap.put(columnName, longColStats);
+        }
+        return new CatalogColumnStatistics(colStatsMap);
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml
index b1951732632..b9d9dfc726b 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml
@@ -237,39 +237,66 @@ HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDPPFactorToReorderTable">
+  <TestCase name="testDPPFactorToReorderTableWithoutStats">
     <Resource name="sql">
-      <![CDATA[Select * from fact_part, item, sales, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and fact_part.id = sales.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
+      <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], amount1=[$9], price1=[$10], id2=[$11], male=[$12], amount2=[$13], price2=[$14], dim_date_sk=[$15])
-+- LogicalFilter(condition=[AND(=($4, $15), =($0, $5), =($0, $8), =($11, $5), <($14, 500), >($14, 300))])
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
    +- LogicalJoin(condition=[true], joinType=[inner])
       :- LogicalJoin(condition=[true], joinType=[inner])
-      :  :- LogicalJoin(condition=[true], joinType=[inner])
-      :  :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
-      :  :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
-      :  +- LogicalTableScan(table=[[testCatalog, test_database, sales]])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
       +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[id00 AS id, name, amount00 AS amount, price00 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id0 AS id1, amount0 AS amount1, price0 AS price1, id000 AS id2, male, amount000 AS amount2, price000 AS price2, dim_date_sk])
-+- HashJoin(joinType=[InnerJoin], where=[=(id, id00)], select=[id, amount, price, id0, amount0, price0, id00, name, amount00, price00, fact_date_sk, id000, male, amount000, price000, dim_date_sk], build=[right])
+Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
++- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
    :- Exchange(distribution=[hash[id]])
    :  +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
-   +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
+   +- HashJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
       :- Exchange(distribution=[hash[id]])
-      :  +- TableSourceScan(table=[[testCatalog, test_database, sales]], fields=[id, amount, price])
-      +- HashJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
-         :- Exchange(distribution=[hash[id]])
-         :  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-         :     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-         :        +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
-         :           +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
-         +- Exchange(distribution=[hash[id]])
+      :  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+      :     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+      :        +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+      :           +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+      +- Exchange(distribution=[hash[id]])
+         +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+            +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDPPFactorToReorderTableWithStats">
+    <Resource name="sql">
+      <![CDATA[Select * from fact_part, item, dim where fact_part.fact_date_sk = dim.dim_date_sk and fact_part.id = item.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk])
++- NestedLoopJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right])
+   :- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
+   +- Exchange(distribution=[broadcast])
+      +- NestedLoopJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right])
+         :- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+         :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+         :     +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+         :        +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+         +- Exchange(distribution=[broadcast])
             +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
                +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
 ]]>
@@ -277,38 +304,98 @@ Calc(select=[id00 AS id, name, amount00 AS amount, price00 AS price, fact_date_s
   </TestCase>
   <TestCase name="testDPPFactorWithDimSideJoinKeyChanged">
     <Resource name="sql">
-      <![CDATA[Select * from fact_part join item on fact_part.id = item.id join sales on fact_part.id = sales.id join (select dim_date_sk + 1 as dim_date_sk, price from dim) dim1 on fact_part.fact_date_sk = dim1.dim_date_sk where dim1.price < 500 and dim1.price > 300]]>
+      <![CDATA[Select * from fact_part join item on fact_part.id = item.id join (select dim_date_sk + 1 as dim_date_sk, price from dim) dim1 on fact_part.fact_date_sk = dim1.dim_date_sk where dim1.price < 500 and dim1.price > 300]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], amount1=[$9], price1=[$10], dim_date_sk=[$11], price2=[$12])
-+- LogicalFilter(condition=[AND(<($12, 500), >($12, 300))])
-   +- LogicalJoin(condition=[=($4, $11)], joinType=[inner])
-      :- LogicalJoin(condition=[=($0, $8)], joinType=[inner])
-      :  :- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
-      :  :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
-      :  :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
-      :  +- LogicalTableScan(table=[[testCatalog, test_database, sales]])
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], dim_date_sk=[$8], price1=[$9])
++- LogicalFilter(condition=[AND(<($9, 500), >($9, 300))])
+   +- LogicalJoin(condition=[=($4, $8)], joinType=[inner])
+      :- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
       +- LogicalProject(dim_date_sk=[+($4, 1)], price=[$3])
          +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[id, name, amount, price0 AS price, fact_date_sk, id0, amount0, price00 AS price0, id00 AS id1, amount00 AS amount1, price000 AS price1, dim_date_sk, price AS price2])
-+- HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[dim_date_sk, price, id, name, amount, price0, fact_date_sk, id0, amount0, price00, id00, amount00, price000], build=[left])
-   :- Exchange(distribution=[hash[dim_date_sk]])
-   :  +- Calc(select=[+(dim_date_sk, 1) AS dim_date_sk, price], where=[SEARCH(price, Sarg[(300..500)])])
-   :     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])
-   +- Exchange(distribution=[hash[fact_date_sk]])
-      +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0, id00, amount00, price00], build=[right])
-         :- Exchange(distribution=[hash[id]])
-         :  +- TableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-         +- HashJoin(joinType=[InnerJoin], where=[=(id0, id)], select=[id, amount, price, id0, amount0, price0], build=[right])
-            :- Exchange(distribution=[hash[id]])
-            :  +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
-            +- Exchange(distribution=[hash[id]])
-               +- TableSourceScan(table=[[testCatalog, test_database, sales]], fields=[id, amount, price])
+Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, dim_date_sk, price00 AS price1])
++- HashJoin(joinType=[InnerJoin], where=[=(id0, id)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, dim_date_sk, price00], build=[right])
+   :- Exchange(distribution=[hash[id]])
+   :  +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
+   +- Exchange(distribution=[hash[id]])
+      +- HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, name, amount, price, fact_date_sk, dim_date_sk, price0], build=[right])
+         :- Exchange(distribution=[hash[fact_date_sk]])
+         :  +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
+         :     +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+         :        +- Calc(select=[+(dim_date_sk, 1) AS dim_date_sk, price], where=[SEARCH(price, Sarg[(300..500)])])
+         :           +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])
+         +- Exchange(distribution=[hash[dim_date_sk]])
+            +- Calc(select=[+(dim_date_sk, 1) AS dim_date_sk, price], where=[SEARCH(price, Sarg[(300..500)])])
+               +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDPPFactorWithFactSideJoinKeyChanged">
+    <Resource name="sql">
+      <![CDATA[Select * from (select fact_date_sk + 1 as fact_date_sk, id from fact_part) fact_part1 join item on fact_part1.id = item.id join dim on fact_part1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(fact_date_sk=[$0], id=[$1], id0=[$2], amount=[$3], price=[$4], id1=[$5], male=[$6], amount0=[$7], price0=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(<($8, 500), >($8, 300))])
+   +- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+      :- LogicalJoin(condition=[=($1, $2)], joinType=[inner])
+      :  :- LogicalProject(fact_date_sk=[+($4, 1)], id=[$0])
+      :  :  +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[fact_date_sk, id0 AS id, id AS id0, amount, price, id00 AS id1, male, amount0, price0, dim_date_sk])
++- HashJoin(joinType=[InnerJoin], where=[=(id0, id)], select=[id, amount, price, fact_date_sk, id0, id00, male, amount0, price0, dim_date_sk], build=[right])
+   :- Exchange(distribution=[hash[id]])
+   :  +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
+   +- Exchange(distribution=[hash[id]])
+      +- HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[fact_date_sk, id, id0, male, amount, price, dim_date_sk], build=[right])
+         :- Exchange(distribution=[hash[fact_date_sk]])
+         :  +- Calc(select=[+(fact_date_sk, 1) AS fact_date_sk, id])
+         :     +- TableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk, id], metadata=[]]], fields=[fact_date_sk, id])
+         +- Exchange(distribution=[hash[dim_date_sk]])
+            +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])])
+               +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDPPFactorWithJoinKeysNotIncludePartitionKeys">
+    <Resource name="sql">
+      <![CDATA[Select * from fact_part, item, dim where fact_part.id = dim.id and fact_part.id = item.id and dim.id = item.id  and dim.price < 500 and dim.price > 300]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($0, $8), =($0, $5), =($8, $5), <($11, 500), >($11, 300))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, item]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, price, fact_date_sk, id0, amount0, price0, id00, male, amount00, price00, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[id]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
++- HashJoin(joinType=[InnerJoin], where=[=(id0, id)], select=[id, amount, price, id0, male, amount0, price0, dim_date_sk], build=[right])
+   :- Exchange(distribution=[hash[id]])
+   :  +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price])
+   +- Exchange(distribution=[hash[id]])
+      +- Calc(select=[id, male, amount, price, dim_date_sk], where=[AND(SEARCH(price, Sarg[(300..500)]), IS NOT NULL(id))])
+         +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
 ]]>
     </Resource>
   </TestCase>