You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/26 06:53:20 UTC

[GitHub] [flink] godfreyhe commented on a diff in pull request #21489: [FLINK-30365][table-planner] New dynamic partition pruning strategy to support more dpp patterns

godfreyhe commented on code in PR #21489:
URL: https://github.com/apache/flink/pull/21489#discussion_r1057087887


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgram.scala:
##########
@@ -59,6 +59,10 @@ class FlinkChainedProgram[OC <: FlinkOptimizeContext]
         val result = program.optimize(input, context)
         val end = System.currentTimeMillis()
 
+        println(

Review Comment:
   remove the useless code



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner program that tries to do partition prune in the execution phase, which can translate a
+ * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan}
+ * whose source is a partition source. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * LogicalProject(...)
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
+ *  * +- Exchange(distribution=[broadcast])
+ *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>This physical plan will be rewritten to:
+ *
+ * <pre>{@code
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
+ * :  +- DynamicFilteringDataCollector(fields=[dim_key])
+ * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
+ * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * +- Exchange(distribution=[broadcast])
+ *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ */
+public class FlinkDynamicPartitionPruningConverterProgram

Review Comment:
   can you explain why we need program instead of planner rule here ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -60,138 +68,153 @@
 public class DynamicPartitionPruningUtils {
 
     /**
-     * For the input join node, judge whether the join left side and join right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right join is not clear.
+     * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if
+     * changed, this RelNode is not dim side.
      */
-    public static boolean supportDynamicPartitionPruning(Join join) {
-        return supportDynamicPartitionPruning(join, true)
-                || supportDynamicPartitionPruning(join, false);
+    public static boolean isDimSide(RelNode rel) {
+        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
+        visitDimSide(rel, dimSideFactors);
+        return dimSideFactors.isDimSide();
     }
 
-    /**
-     * For the input join node, judge whether the join left side and join right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right is clear. If meets the
-     * requirements, return true.
-     */
-    public static boolean supportDynamicPartitionPruning(Join join, boolean factInLeft) {
-        if (!ShortcutUtils.unwrapContext(join)
-                .getTableConfig()
-                .get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
-            return false;
-        }
-        // Now dynamic partition pruning supports left/right join, inner and semi join. but now semi
-        // join can not join reorder.
-        if (join.getJoinType() == JoinRelType.LEFT) {
-            if (factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() == JoinRelType.RIGHT) {
-            if (!factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() != JoinRelType.INNER
-                && join.getJoinType() != JoinRelType.SEMI) {
-            return false;
+    public static List<String> getSuitableDynamicFilteringFieldsInFactSide(

Review Comment:
   can be marked as private



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala:
##########
@@ -40,6 +40,7 @@ object FlinkBatchProgram {
   val TIME_INDICATOR = "time_indicator"
   val PHYSICAL = "physical"
   val PHYSICAL_REWRITE = "physical_rewrite"
+  val DYNAMIC_PARTITION_PRUNING_CONVERTER = "dynamic_partition_pruning_converter"

Review Comment:
   dynamic_partition_pruning



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -201,33 +224,128 @@ private static void visitFactSide(
                             .collect(Collectors.toList());
             ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
             if (inputJoinKeys.isEmpty()) {
-                factSideFactors.isSuitableJoinKey = false;
-                return;
+                return rel;
             }
 
-            visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
+            return rel.copy(
+                    rel.getTraitSet(),
+                    Collections.singletonList(
+                            convertDppFactSide(
+                                    rel.getInput(0),
+                                    inputJoinKeys,
+                                    dimSide,
+                                    dimSideJoinKey,
+                                    factSideFactors,
+                                    join)));
+        } else if (rel instanceof Join) {
+            Join currentJoin = (Join) rel;
+            return currentJoin.copy(
+                    currentJoin.getTraitSet(),
+                    Arrays.asList(
+                            convertDppFactSide(
+                                    currentJoin.getLeft(),
+                                    getInputIndices(currentJoin, joinKeys, true),
+                                    dimSide,
+                                    dimSideJoinKey,
+                                    factSideFactors,
+                                    join),
+                            convertDppFactSide(
+                                    currentJoin.getRight(),
+                                    getInputIndices(currentJoin, joinKeys, false),
+                                    dimSide,
+                                    dimSideJoinKey,
+                                    factSideFactors,
+                                    join)));
+        } else if (rel instanceof Union) {
+            Union union = (Union) rel;
+            List<RelNode> newInputs = new ArrayList<>();
+            for (RelNode input : union.getInputs()) {
+                newInputs.add(
+                        convertDppFactSide(
+                                input, joinKeys, dimSide, dimSideJoinKey, factSideFactors, join));
+            }
+            return union.copy(union.getTraitSet(), newInputs, union.all);
+        } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+            BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel;
+            List<RelNode> newInputs = new ArrayList<>();
+            for (RelNode input : agg.getInputs()) {
+                newInputs.add(
+                        convertDppFactSide(
+                                input,
+                                getInputIndices(agg, input, joinKeys),
+                                dimSide,
+                                dimSideJoinKey,
+                                factSideFactors,
+                                join));
+            }
+
+            return agg.copy(agg.getTraitSet(), newInputs);
         }
+
+        return rel;
     }
 
-    public static List<String> getSuitableDynamicFilteringFieldsInFactSide(
-            DynamicTableSource tableSource, List<String> candidateFields) {
-        List<String> acceptedFilterFields =
-                ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields();
-        if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
-            return new ArrayList<>();
+    private static boolean isSuitableJoin(Join join) {
+        // Now dynamic partition pruning supports left/right join, inner and semi
+        // join. but now semi
+        // join can not join reorder.
+        if (join.getJoinType() != JoinRelType.INNER
+                && join.getJoinType() != JoinRelType.SEMI
+                && join.getJoinType() != JoinRelType.LEFT
+                && join.getJoinType() != JoinRelType.RIGHT) {
+            return false;
         }
 
-        List<String> suitableFields = new ArrayList<>();
-        // If candidateField not in acceptedFilterFields means dpp rule will not be matched,
-        // because we can not prune any partitions according to non-accepted filter fields
-        // provided by partition table source.
-        for (String candidateField : candidateFields) {
-            if (acceptedFilterFields.contains(candidateField)) {
-                suitableFields.add(candidateField);
+        JoinInfo joinInfo = join.analyzeCondition();
+        return !joinInfo.leftKeys.isEmpty();
+    }
+
+    private static ImmutableIntList getInputIndices(
+            BatchPhysicalGroupAggregateBase agg, RelNode aggInput, ImmutableIntList joinKeys) {
+        int[] indexMap = new int[aggInput.getRowType().getFieldCount()];
+        int[] grouping = agg.grouping();
+        if (grouping.length == 0) {
+            return joinKeys;
+        }
+        int beginIndex = grouping[0] - 1;
+        for (int i = 0; i < indexMap.length; i++) {
+            indexMap[i] = i;
+        }
+
+        System.arraycopy(grouping, 0, indexMap, 0, grouping.length);
+        if (beginIndex >= 0) {

Review Comment:
   why we check `beginIndex >= 0` ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -275,6 +400,19 @@ && isSuitableFilter(origProgram.expandLocalRef(origProgram.getCondition()))) {
                 dimSideFactors.hasFilter = true;
             }
             visitDimSide(rel.getInput(0), dimSideFactors);
+        } else if (rel instanceof Join) {
+            Join join = (Join) rel;
+            visitDimSide(join.getLeft(), dimSideFactors);
+            visitDimSide(join.getRight(), dimSideFactors);
+        } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+            for (RelNode input : rel.getInputs()) {

Review Comment:
   there is only one input for BatchPhysicalGroupAggregateBase



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner program that tries to do partition prune in the execution phase, which can translate a
+ * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan}
+ * whose source is a partition source. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * LogicalProject(...)
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
+ *  * +- Exchange(distribution=[broadcast])
+ *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>This physical plan will be rewritten to:
+ *
+ * <pre>{@code
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
+ * :  +- DynamicFilteringDataCollector(fields=[dim_key])
+ * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
+ * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * +- Exchange(distribution=[broadcast])
+ *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ */
+public class FlinkDynamicPartitionPruningConverterProgram
+        implements FlinkOptimizeProgram<BatchOptimizeContext> {
+
+    @Override
+    public RelNode optimize(RelNode root, BatchOptimizeContext context) {
+        DefaultRelShuttle shuttle =
+                new DefaultRelShuttle() {
+                    @Override
+                    public RelNode visit(RelNode rel) {
+                        if (!(rel instanceof Join)) {
+                            List<RelNode> newInputs = new ArrayList<>();
+                            for (RelNode input : rel.getInputs()) {
+                                RelNode newInput = input.accept(this);
+                                newInputs.add(newInput);
+                            }
+                            return rel.copy(rel.getTraitSet(), newInputs);
+                        }
+                        Join join = (Join) rel;
+                        if (!ShortcutUtils.unwrapContext(join)
+                                .getTableConfig()
+                                .get(
+                                        OptimizerConfigOptions
+                                                .TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {

Review Comment:
   the check should be at the begin of the `optimize` method



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -258,7 +376,14 @@ private static void visitDimSide(RelNode rel, DppDimSideFactors dimSideFactors)
                 }
             }
             CatalogTable catalogTable = table.contextResolvedTable().getResolvedTable();
-            dimSideFactors.hasNonPartitionedScan = !catalogTable.isPartitioned();
+            if (catalogTable.isPartitioned()) {
+                dimSideFactors.hasPartitionedScan = true;
+                return;
+            }
+
+            if (!dimSideFactors.setTables(table.contextResolvedTable())) {
+                return;

Review Comment:
   the return is useless



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -60,138 +68,153 @@
 public class DynamicPartitionPruningUtils {
 
     /**
-     * For the input join node, judge whether the join left side and join right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right join is not clear.
+     * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if
+     * changed, this RelNode is not dim side.
      */
-    public static boolean supportDynamicPartitionPruning(Join join) {
-        return supportDynamicPartitionPruning(join, true)
-                || supportDynamicPartitionPruning(join, false);
+    public static boolean isDimSide(RelNode rel) {
+        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
+        visitDimSide(rel, dimSideFactors);
+        return dimSideFactors.isDimSide();
     }
 
-    /**
-     * For the input join node, judge whether the join left side and join right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right is clear. If meets the
-     * requirements, return true.
-     */
-    public static boolean supportDynamicPartitionPruning(Join join, boolean factInLeft) {
-        if (!ShortcutUtils.unwrapContext(join)
-                .getTableConfig()
-                .get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
-            return false;
-        }
-        // Now dynamic partition pruning supports left/right join, inner and semi join. but now semi
-        // join can not join reorder.
-        if (join.getJoinType() == JoinRelType.LEFT) {
-            if (factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() == JoinRelType.RIGHT) {
-            if (!factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() != JoinRelType.INNER
-                && join.getJoinType() != JoinRelType.SEMI) {
-            return false;
+    public static List<String> getSuitableDynamicFilteringFieldsInFactSide(
+            DynamicTableSource tableSource, List<String> candidateFields) {
+        List<String> acceptedFilterFields =
+                ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields();
+        if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
+            return new ArrayList<>();
         }
 
-        JoinInfo joinInfo = join.analyzeCondition();
-        if (joinInfo.leftKeys.isEmpty()) {
-            return false;
+        List<String> suitableFields = new ArrayList<>();
+        // If candidateField not in acceptedFilterFields means dpp rule will not be matched,
+        // because we can not prune any partitions according to non-accepted filter fields
+        // provided by partition table source.
+        for (String candidateField : candidateFields) {
+            if (acceptedFilterFields.contains(candidateField)) {
+                suitableFields.add(candidateField);
+            }
         }
-        RelNode left = join.getLeft();
-        RelNode right = join.getRight();
-
-        // TODO Now fact side and dim side don't support many complex patterns, like join inside
-        // fact/dim side, agg inside fact/dim side etc. which will support next.
-        return factInLeft
-                ? isDynamicPartitionPruningPattern(left, right, joinInfo.leftKeys)
-                : isDynamicPartitionPruningPattern(right, left, joinInfo.rightKeys);
-    }
 
-    private static boolean isDynamicPartitionPruningPattern(
-            RelNode factSide, RelNode dimSide, ImmutableIntList factSideJoinKey) {
-        return isDimSide(dimSide) && isFactSide(factSide, factSideJoinKey);
+        return suitableFields;
     }
 
-    /** make a dpp fact side factor to recurrence in fact side. */
-    private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
+    public static Tuple2<Boolean, RelNode> canConvertAndConvertDppFactSide(
+            RelNode rel,
+            ImmutableIntList joinKeys,
+            RelNode dimSide,
+            ImmutableIntList dimSideJoinKey,
+            Join join) {
         DppFactSideFactors factSideFactors = new DppFactSideFactors();
-        visitFactSide(rel, factSideFactors, joinKeys);
-        return factSideFactors.isFactSide();
+        RelNode newRel =
+                convertDppFactSide(rel, joinKeys, dimSide, dimSideJoinKey, factSideFactors, join);
+        return Tuple2.of(factSideFactors.isChanged, newRel);
     }
 
-    /**
-     * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we
-     * need not consider the join keys in dim side, which already deal by dynamic partition pruning
-     * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if
-     * changed, this RelNode is not dim side.
-     */
-    private static boolean isDimSide(RelNode rel) {
-        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
-        visitDimSide(rel, dimSideFactors);
-        return dimSideFactors.isDimSide();
-    }
-
-    /**
-     * Visit fact side to judge whether fact side has partition table, partition table source meets
-     * the condition of dpp table source and dynamic filtering keys changed in fact side.
-     */
-    private static void visitFactSide(
-            RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList joinKeys) {
+    public static RelNode convertDppFactSide(

Review Comment:
   can be marked as private



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -60,138 +68,153 @@
 public class DynamicPartitionPruningUtils {
 
     /**
-     * For the input join node, judge whether the join left side and join right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right join is not clear.
+     * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if
+     * changed, this RelNode is not dim side.
      */
-    public static boolean supportDynamicPartitionPruning(Join join) {
-        return supportDynamicPartitionPruning(join, true)
-                || supportDynamicPartitionPruning(join, false);
+    public static boolean isDimSide(RelNode rel) {
+        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
+        visitDimSide(rel, dimSideFactors);
+        return dimSideFactors.isDimSide();
     }
 
-    /**
-     * For the input join node, judge whether the join left side and join right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right is clear. If meets the
-     * requirements, return true.
-     */
-    public static boolean supportDynamicPartitionPruning(Join join, boolean factInLeft) {
-        if (!ShortcutUtils.unwrapContext(join)
-                .getTableConfig()
-                .get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
-            return false;
-        }
-        // Now dynamic partition pruning supports left/right join, inner and semi join. but now semi
-        // join can not join reorder.
-        if (join.getJoinType() == JoinRelType.LEFT) {
-            if (factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() == JoinRelType.RIGHT) {
-            if (!factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() != JoinRelType.INNER
-                && join.getJoinType() != JoinRelType.SEMI) {
-            return false;
+    public static List<String> getSuitableDynamicFilteringFieldsInFactSide(
+            DynamicTableSource tableSource, List<String> candidateFields) {
+        List<String> acceptedFilterFields =
+                ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields();
+        if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
+            return new ArrayList<>();
         }
 
-        JoinInfo joinInfo = join.analyzeCondition();
-        if (joinInfo.leftKeys.isEmpty()) {
-            return false;
+        List<String> suitableFields = new ArrayList<>();
+        // If candidateField not in acceptedFilterFields means dpp rule will not be matched,
+        // because we can not prune any partitions according to non-accepted filter fields
+        // provided by partition table source.
+        for (String candidateField : candidateFields) {
+            if (acceptedFilterFields.contains(candidateField)) {
+                suitableFields.add(candidateField);
+            }
         }
-        RelNode left = join.getLeft();
-        RelNode right = join.getRight();
-
-        // TODO Now fact side and dim side don't support many complex patterns, like join inside
-        // fact/dim side, agg inside fact/dim side etc. which will support next.
-        return factInLeft
-                ? isDynamicPartitionPruningPattern(left, right, joinInfo.leftKeys)
-                : isDynamicPartitionPruningPattern(right, left, joinInfo.rightKeys);
-    }
 
-    private static boolean isDynamicPartitionPruningPattern(
-            RelNode factSide, RelNode dimSide, ImmutableIntList factSideJoinKey) {
-        return isDimSide(dimSide) && isFactSide(factSide, factSideJoinKey);
+        return suitableFields;
     }
 
-    /** make a dpp fact side factor to recurrence in fact side. */
-    private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
+    public static Tuple2<Boolean, RelNode> canConvertAndConvertDppFactSide(
+            RelNode rel,
+            ImmutableIntList joinKeys,
+            RelNode dimSide,
+            ImmutableIntList dimSideJoinKey,
+            Join join) {
         DppFactSideFactors factSideFactors = new DppFactSideFactors();
-        visitFactSide(rel, factSideFactors, joinKeys);
-        return factSideFactors.isFactSide();
+        RelNode newRel =
+                convertDppFactSide(rel, joinKeys, dimSide, dimSideJoinKey, factSideFactors, join);
+        return Tuple2.of(factSideFactors.isChanged, newRel);
     }
 
-    /**
-     * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we
-     * need not consider the join keys in dim side, which already deal by dynamic partition pruning
-     * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if
-     * changed, this RelNode is not dim side.
-     */
-    private static boolean isDimSide(RelNode rel) {
-        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
-        visitDimSide(rel, dimSideFactors);
-        return dimSideFactors.isDimSide();
-    }
-
-    /**
-     * Visit fact side to judge whether fact side has partition table, partition table source meets
-     * the condition of dpp table source and dynamic filtering keys changed in fact side.
-     */
-    private static void visitFactSide(
-            RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList joinKeys) {
+    public static RelNode convertDppFactSide(
+            RelNode rel,
+            ImmutableIntList joinKeys,
+            RelNode dimSide,
+            ImmutableIntList dimSideJoinKey,
+            DppFactSideFactors factSideFactors,
+            Join join) {
         if (rel instanceof TableScan) {
             TableScan scan = (TableScan) rel;
             if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {
                 // rule applied
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
             TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
             if (tableSourceTable == null) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
             CatalogTable catalogTable = tableSourceTable.contextResolvedTable().getResolvedTable();
             List<String> partitionKeys = catalogTable.getPartitionKeys();
             if (partitionKeys.isEmpty()) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
             DynamicTableSource tableSource = tableSourceTable.tableSource();
             if (!(tableSource instanceof SupportsDynamicFiltering)
                     || !(tableSource instanceof ScanTableSource)) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
             if (!isNewSource((ScanTableSource) tableSource)) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
 
             List<String> candidateFields =
                     joinKeys.stream()
                             .map(i -> scan.getRowType().getFieldNames().get(i))
                             .collect(Collectors.toList());
             if (candidateFields.isEmpty()) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
 
-            factSideFactors.isSuitableFactScanSource =
-                    !getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields)
-                            .isEmpty();
-        } else if (rel instanceof HepRelVertex) {
-            visitFactSide(((HepRelVertex) rel).getCurrentRel(), factSideFactors, joinKeys);
+            List<String> acceptedFilterFields =
+                    getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields);
+
+            if (acceptedFilterFields.size() == 0) {
+                return rel;
+            }
+
+            // Apply suitable accepted filter fields to source.
+            ((SupportsDynamicFiltering) tableSource).applyDynamicFiltering(acceptedFilterFields);
+
+            List<Integer> acceptedFieldIndices =
+                    acceptedFilterFields.stream()
+                            .map(f -> scan.getRowType().getFieldNames().indexOf(f))
+                            .collect(Collectors.toList());
+            List<Integer> dynamicFilteringFieldIndices = new ArrayList<>();
+            for (int i = 0; i < joinKeys.size(); ++i) {
+                if (acceptedFieldIndices.contains(joinKeys.get(i))) {
+                    dynamicFilteringFieldIndices.add(dimSideJoinKey.get(i));
+                }
+            }
+
+            BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector =
+                    createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices);
+
+            factSideFactors.isChanged = true;
+
+            if (!isSuitableJoin(join)) {

Review Comment:
   this validation can be moved to `join` branch



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -201,33 +224,128 @@ private static void visitFactSide(
                             .collect(Collectors.toList());
             ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys);
             if (inputJoinKeys.isEmpty()) {
-                factSideFactors.isSuitableJoinKey = false;
-                return;
+                return rel;
             }
 
-            visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
+            return rel.copy(
+                    rel.getTraitSet(),
+                    Collections.singletonList(
+                            convertDppFactSide(
+                                    rel.getInput(0),
+                                    inputJoinKeys,
+                                    dimSide,
+                                    dimSideJoinKey,
+                                    factSideFactors,
+                                    join)));
+        } else if (rel instanceof Join) {
+            Join currentJoin = (Join) rel;
+            return currentJoin.copy(
+                    currentJoin.getTraitSet(),
+                    Arrays.asList(
+                            convertDppFactSide(
+                                    currentJoin.getLeft(),
+                                    getInputIndices(currentJoin, joinKeys, true),
+                                    dimSide,
+                                    dimSideJoinKey,
+                                    factSideFactors,
+                                    join),
+                            convertDppFactSide(
+                                    currentJoin.getRight(),
+                                    getInputIndices(currentJoin, joinKeys, false),
+                                    dimSide,
+                                    dimSideJoinKey,
+                                    factSideFactors,
+                                    join)));
+        } else if (rel instanceof Union) {
+            Union union = (Union) rel;
+            List<RelNode> newInputs = new ArrayList<>();
+            for (RelNode input : union.getInputs()) {
+                newInputs.add(
+                        convertDppFactSide(
+                                input, joinKeys, dimSide, dimSideJoinKey, factSideFactors, join));
+            }
+            return union.copy(union.getTraitSet(), newInputs, union.all);
+        } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+            BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel;
+            List<RelNode> newInputs = new ArrayList<>();
+            for (RelNode input : agg.getInputs()) {
+                newInputs.add(
+                        convertDppFactSide(
+                                input,
+                                getInputIndices(agg, input, joinKeys),
+                                dimSide,
+                                dimSideJoinKey,
+                                factSideFactors,
+                                join));
+            }
+
+            return agg.copy(agg.getTraitSet(), newInputs);
         }

Review Comment:
   we should throw exception for the `else` branch ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner program that tries to do partition prune in the execution phase, which can translate a
+ * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan}
+ * whose source is a partition source. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * LogicalProject(...)
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
+ *  * +- Exchange(distribution=[broadcast])
+ *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>This physical plan will be rewritten to:
+ *
+ * <pre>{@code
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
+ * :  +- DynamicFilteringDataCollector(fields=[dim_key])
+ * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
+ * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * +- Exchange(distribution=[broadcast])
+ *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ */
+public class FlinkDynamicPartitionPruningConverterProgram
+        implements FlinkOptimizeProgram<BatchOptimizeContext> {
+
+    @Override
+    public RelNode optimize(RelNode root, BatchOptimizeContext context) {
+        DefaultRelShuttle shuttle =
+                new DefaultRelShuttle() {
+                    @Override
+                    public RelNode visit(RelNode rel) {
+                        if (!(rel instanceof Join)) {
+                            List<RelNode> newInputs = new ArrayList<>();
+                            for (RelNode input : rel.getInputs()) {
+                                RelNode newInput = input.accept(this);
+                                newInputs.add(newInput);
+                            }
+                            return rel.copy(rel.getTraitSet(), newInputs);
+                        }
+                        Join join = (Join) rel;
+                        if (!ShortcutUtils.unwrapContext(join)
+                                .getTableConfig()
+                                .get(
+                                        OptimizerConfigOptions
+                                                .TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+                            return join;
+                        }
+
+                        JoinInfo joinInfo = join.analyzeCondition();
+                        RelNode leftSide = join.getLeft();
+                        RelNode rightSide = join.getRight();
+                        Join newJoin = join;
+                        boolean changed = false;
+                        if (DynamicPartitionPruningUtils.isDimSide(leftSide)) {
+                            if (join.getJoinType() != JoinRelType.RIGHT) {
+                                Tuple2<Boolean, RelNode> relTuple =
+                                        DynamicPartitionPruningUtils
+                                                .canConvertAndConvertDppFactSide(
+                                                        rightSide,
+                                                        joinInfo.rightKeys,
+                                                        leftSide,
+                                                        joinInfo.leftKeys,
+                                                        join);
+                                changed = relTuple.f0;
+                                newJoin =
+                                        join.copy(
+                                                join.getTraitSet(),
+                                                Arrays.asList(leftSide, relTuple.f1.accept(this)));
+                            }
+                        }
+                        if (DynamicPartitionPruningUtils.isDimSide(rightSide)) {

Review Comment:
   if leftSide is dime side, do we need to check the right side agin ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner program that tries to do partition prune in the execution phase, which can translate a
+ * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan}
+ * whose source is a partition source. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true.
+ *
+ * <p>Suppose we have the original physical plan:
+ *
+ * <pre>{@code
+ * LogicalProject(...)
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
+ *  * +- Exchange(distribution=[broadcast])
+ *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ *
+ * <p>This physical plan will be rewritten to:
+ *
+ * <pre>{@code
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
+ * :  +- DynamicFilteringDataCollector(fields=[dim_key])
+ * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
+ * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * +- Exchange(distribution=[broadcast])
+ *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }</pre>
+ */
+public class FlinkDynamicPartitionPruningConverterProgram

Review Comment:
   FlinkDynamicPartitionPruningProgram



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org