You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/07 13:52:11 UTC

[flink] 02/02: [FLINK-28710][table-planner] Supports dynamic filtering execution

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8011f139dfa86ad9abf764994a572295337fdc5b
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Aug 5 11:53:02 2022 +0800

    [FLINK-28710][table-planner] Supports dynamic filtering execution
    
    This closes #20472
---
 .../table/planner/plan/nodes/exec/ExecEdge.java    |   8 +
 .../planner/plan/nodes/exec/ExecNodeBase.java      |   4 +
 .../BatchExecDynamicFilteringDataCollector.java    | 107 ++++
 .../nodes/exec/batch/BatchExecTableSourceScan.java |  82 ++-
 .../exec/common/CommonExecTableSourceScan.java     |   6 +-
 .../DynamicFilteringDependencyProcessor.java       |  87 +++
 .../processor/ResetTransformationProcessor.java    |  46 ++
 .../utils/InputPriorityConflictResolver.java       |  24 +-
 .../exec/stream/StreamExecTableSourceScan.java     |  11 +-
 .../table/planner/delegation/BatchPlanner.scala    |  11 +-
 .../table/planner/delegation/PlannerBase.scala     |  10 +
 .../table/planner/delegation/StreamPlanner.scala   |   6 +-
 ...atchPhysicalDynamicFilteringDataCollector.scala |  13 +-
 ...chPhysicalDynamicFilteringTableSourceScan.scala |   3 +-
 .../source/DynamicFilteringValuesSource.java       | 129 ++++
 .../source/DynamicFilteringValuesSourceReader.java | 185 ++++++
 .../DynamicFilteringValuesSourceEnumerator.java    | 149 +++++
 .../source/split/ValuesSourcePartitionSplit.java   |  48 ++
 .../ValuesSourcePartitionSplitSerializer.java      |  76 +++
 .../planner/factories/TestValuesTableFactory.java  | 101 +--
 .../plan/batch/sql/DynamicFilteringTest.java       | 162 +++++
 .../utils/InputPriorityConflictResolverTest.java   |  54 ++
 .../runtime/batch/sql/DynamicFilteringITCase.java  | 268 ++++++++
 .../plan/batch/sql/DynamicFilteringTest.xml        | 713 +++++++++++++++++++++
 .../nodes/exec/operator/BatchOperatorNameTest.xml  | 232 +++----
 .../table/planner/runtime/utils/TestData.scala     |  16 +
 26 files changed, 2377 insertions(+), 174 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
index ca2559e4559..4a6ee683495 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
@@ -113,6 +113,14 @@ public class ExecEdge {
         private Shuffle shuffle = FORWARD_SHUFFLE;
         private StreamExchangeMode exchangeMode = StreamExchangeMode.PIPELINED;
 
+        public Builder from(ExecEdge original) {
+            this.source = original.source;
+            this.target = original.target;
+            this.shuffle = original.shuffle;
+            this.exchangeMode = original.exchangeMode;
+            return this;
+        }
+
         public Builder source(ExecNode<?> source) {
             this.source = source;
             return this;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 06b88507b3f..843abc2d992 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -245,4 +245,8 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> {
         }
         return detailName;
     }
+
+    public void resetTransformation() {
+        this.transformation = null;
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
new file mode 100644
index 00000000000..f12423074ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Batch {@link ExecNode} that collects inputs and builds {@link
+ * org.apache.flink.table.connector.source.DynamicFilteringData}, and then sends the {@link
+ * org.apache.flink.table.connector.source.DynamicFilteringEvent} to the source coordinator.
+ */
+public class BatchExecDynamicFilteringDataCollector extends ExecNodeBase<Object>
+        implements BatchExecNode<Object> {
+
+    @Experimental
+    private static final ConfigOption<MemorySize> TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD =
+            key("table.exec.dynamic-filtering.threshold")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("8 mb"))
+                    .withDescription(
+                            "If the collector collects more data than the threshold (default is 8M), "
+                                    + "an empty DynamicFilterEvent with a flag only will be sent to Coordinator, "
+                                    + "which could avoid exceeding the akka limit and out-of-memory (see "
+                                    + AkkaOptions.FRAMESIZE.key()
+                                    + "). Otherwise a DynamicFilterEvent with all deduplicated records will be sent to Coordinator.");
+
+    private final List<Integer> dynamicFilteringFieldIndices;
+
+    public BatchExecDynamicFilteringDataCollector(
+            List<Integer> dynamicFilteringFieldIndices,
+            ReadableConfig tableConfig,
+            InputProperty inputProperty,
+            RowType outputType,
+            String description) {
+        super(
+                ExecNodeContext.newNodeId(),
+                ExecNodeContext.newContext(BatchExecTableSourceScan.class),
+                ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
+                Collections.singletonList(inputProperty),
+                outputType,
+                description);
+        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+        checkArgument(outputType.getFieldCount() == dynamicFilteringFieldIndices.size());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<Object> translateToPlanInternal(
+            PlannerBase planner, ExecNodeConfig config) {
+        final ExecEdge inputEdge = getInputEdges().get(0);
+        final Transformation<RowData> inputTransform =
+                (Transformation<RowData>) inputEdge.translateToPlan(planner);
+        StreamOperatorFactory<Object> factory =
+                new DynamicFilteringDataCollectorOperatorFactory(
+                        (RowType) getOutputType(),
+                        dynamicFilteringFieldIndices,
+                        config.get(TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD).getBytes());
+
+        return ExecNodeUtil.createOneInputTransformation(
+                inputTransform,
+                createTransformationName(config),
+                createTransformationDescription(config),
+                factory,
+                InternalTypeInfo.of(getOutputType()),
+                1); // parallelism should always be 1
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
index 95e98273fb9..f0fd611cb04 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
@@ -23,17 +23,28 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperatorFactory;
+import org.apache.flink.table.runtime.operators.dynamicfiltering.ExecutionOrderEnforcerOperatorFactory;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.UUID;
 
 /**
  * Batch {@link ExecNode} to read data from an external source defined by a bounded {@link
@@ -42,9 +53,17 @@ import org.apache.flink.table.types.logical.RowType;
 public class BatchExecTableSourceScan extends CommonExecTableSourceScan
         implements BatchExecNode<RowData> {
 
+    // Avoids creating different ids if translated multiple times
+    private final String dynamicFilteringDataListenerID = UUID.randomUUID().toString();
+
+    private boolean needDynamicFilteringDependency;
+
+    // This constructor can be used only when table source scan has
+    // BatchExecDynamicFilteringDataCollector input
     public BatchExecTableSourceScan(
             ReadableConfig tableConfig,
             DynamicTableSourceSpec tableSourceSpec,
+            InputProperty inputProperty,
             RowType outputType,
             String description) {
         super(
@@ -52,10 +71,30 @@ public class BatchExecTableSourceScan extends CommonExecTableSourceScan
                 ExecNodeContext.newContext(BatchExecTableSourceScan.class),
                 ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
                 tableSourceSpec,
+                Collections.singletonList(inputProperty),
                 outputType,
                 description);
     }
 
+    public BatchExecTableSourceScan(
+            ReadableConfig tableConfig,
+            DynamicTableSourceSpec tableSourceSpec,
+            RowType outputType,
+            String description) {
+        super(
+                ExecNodeContext.newNodeId(),
+                ExecNodeContext.newContext(BatchExecTableSourceScan.class),
+                ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
+                tableSourceSpec,
+                Collections.emptyList(),
+                outputType,
+                description);
+    }
+
+    public void setNeedDynamicFilteringDependency(boolean needDynamicFilteringDependency) {
+        this.needDynamicFilteringDependency = needDynamicFilteringDependency;
+    }
+
     @Override
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
@@ -64,7 +103,48 @@ public class BatchExecTableSourceScan extends CommonExecTableSourceScan
         // the boundedness has been checked via the runtime provider already, so we can safely
         // declare all legacy transformations as bounded to make the stream graph generator happy
         ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);
-        return transformation;
+
+        // no dynamic filtering applied
+        if (getInputEdges().isEmpty() || !(transformation instanceof SourceTransformation)) {
+            return transformation;
+        }
+
+        // handle dynamic filtering
+        Preconditions.checkState(getInputEdges().size() == 1);
+        BatchExecNode<?> input = (BatchExecNode<?>) getInputEdges().get(0).getSource();
+        if (!(input instanceof BatchExecDynamicFilteringDataCollector)) {
+            throw new TableException(
+                    "The source input must be BatchExecDynamicFilteringDataCollector for now");
+        }
+        BatchExecDynamicFilteringDataCollector dynamicFilteringDataCollector =
+                (BatchExecDynamicFilteringDataCollector) input;
+
+        ((SourceTransformation<?, ?, ?>) transformation)
+                .setCoordinatorListeningID(dynamicFilteringDataListenerID);
+
+        // Must use translateToPlan to avoid duplication dynamic filters.
+        Transformation<Object> dynamicFilteringTransform =
+                dynamicFilteringDataCollector.translateToPlan(planner);
+        ((DynamicFilteringDataCollectorOperatorFactory)
+                        ((OneInputTransformation<?, ?>) dynamicFilteringTransform)
+                                .getOperatorFactory())
+                .registerDynamicFilteringDataListenerID(dynamicFilteringDataListenerID);
+
+        if (!needDynamicFilteringDependency) {
+            planner.addExtraTransformation(dynamicFilteringTransform);
+            return transformation;
+        } else {
+            MultipleInputTransformation<RowData> multipleInputTransformation =
+                    new MultipleInputTransformation<>(
+                            "Order-Enforcer",
+                            new ExecutionOrderEnforcerOperatorFactory<>(),
+                            transformation.getOutputType(),
+                            transformation.getParallelism());
+            multipleInputTransformation.addInput(dynamicFilteringTransform);
+            multipleInputTransformation.addInput(transformation);
+
+            return multipleInputTransformation;
+        }
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
index 0576958b8a2..97d5fd86455 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
@@ -44,6 +44,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
@@ -56,7 +57,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -77,9 +78,10 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData>
             ExecNodeContext context,
             ReadableConfig persistedConfig,
             DynamicTableSourceSpec tableSourceSpec,
+            List<InputProperty> inputProperties,
             LogicalType outputType,
             String description) {
-        super(id, context, persistedConfig, Collections.emptyList(), outputType, description);
+        super(id, context, persistedConfig, inputProperties, outputType, description);
         this.tableSourceSpec = tableSourceSpec;
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java
new file mode 100644
index 00000000000..dbbb48ea0e4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.processor;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMultipleInput;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This processor future checks each dynamic filter source to see if it is chained with a multiple
+ * input operator. If so, we'll set the dependency flag.
+ *
+ * <p>NOTE: This processor can be only applied on {@link BatchExecNode} DAG.
+ */
+public class DynamicFilteringDependencyProcessor implements ExecNodeGraphProcessor {
+
+    @Override
+    public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) {
+        Map<BatchExecTableSourceScan, List<ExecNode<?>>> dynamicFilteringScanDescendants =
+                new HashMap<>();
+
+        AbstractExecNodeExactlyOnceVisitor dynamicFilteringScanCollector =
+                new AbstractExecNodeExactlyOnceVisitor() {
+                    @Override
+                    protected void visitNode(ExecNode<?> node) {
+                        node.getInputEdges().stream()
+                                .map(ExecEdge::getSource)
+                                .forEach(
+                                        input -> {
+                                            // The character of the dynamic filter scan is that it
+                                            // has an input.
+                                            if (input instanceof BatchExecTableSourceScan
+                                                    && input.getInputEdges().size() > 0) {
+                                                dynamicFilteringScanDescendants
+                                                        .computeIfAbsent(
+                                                                (BatchExecTableSourceScan) input,
+                                                                ignored -> new ArrayList<>())
+                                                        .add(node);
+                                            }
+                                        });
+
+                        visitInputs(node);
+                    }
+                };
+        execGraph.getRootNodes().forEach(node -> node.accept(dynamicFilteringScanCollector));
+
+        for (Map.Entry<BatchExecTableSourceScan, List<ExecNode<?>>> entry :
+                dynamicFilteringScanDescendants.entrySet()) {
+            if (entry.getValue().size() == 1) {
+                ExecNode<?> next = entry.getValue().get(0);
+                if (next instanceof BatchExecMultipleInput) {
+                    // the source can be chained with BatchExecMultipleInput
+                    continue;
+                }
+            }
+            // otherwise we need dependencies
+            entry.getKey().setNeedDynamicFilteringDependency(true);
+        }
+
+        return execGraph;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ResetTransformationProcessor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ResetTransformationProcessor.java
new file mode 100644
index 00000000000..4c090786922
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ResetTransformationProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.processor;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+
+/**
+ * A {@link ExecNodeGraphProcessor} that reset the Transformation to null generated by other
+ * processors, such as: {@link MultipleInputNodeCreationProcessor}.
+ */
+public class ResetTransformationProcessor implements ExecNodeGraphProcessor {
+
+    @Override
+    public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) {
+        AbstractExecNodeExactlyOnceVisitor visitor =
+                new AbstractExecNodeExactlyOnceVisitor() {
+
+                    @Override
+                    protected void visitNode(ExecNode<?> node) {
+                        ((ExecNodeBase<?>) node).resetTransformation();
+                        visitInputs(node);
+                    }
+                };
+        execGraph.getRootNodes().forEach(r -> r.accept(visitor));
+        return execGraph;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
index a6588df84d9..144b527c520 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
@@ -25,11 +25,14 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
 import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import static org.apache.flink.table.planner.utils.StreamExchangeModeUtils.getBatchStreamExchangeMode;
 
@@ -59,7 +62,7 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator {
             InputProperty.DamBehavior safeDamBehavior,
             StreamExchangeMode exchangeMode,
             ReadableConfig tableConfig) {
-        super(roots, Collections.emptySet(), safeDamBehavior);
+        super(roots, getDynamicFilteringSourceNodes(roots), safeDamBehavior);
         this.exchangeMode = exchangeMode;
         this.tableConfig = tableConfig;
     }
@@ -152,6 +155,25 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator {
         return exchange;
     }
 
+    private static Set<ExecNode<?>> getDynamicFilteringSourceNodes(List<ExecNode<?>> roots) {
+        Set<ExecNode<?>> sourceNodes = new HashSet<>();
+        AbstractExecNodeExactlyOnceVisitor visitor =
+                new AbstractExecNodeExactlyOnceVisitor() {
+                    @Override
+                    protected void visitNode(ExecNode<?> node) {
+                        // skip the input of DynamicFiltering TableScanSource
+                        if (node instanceof CommonExecTableSourceScan
+                                && node.getInputEdges().size() > 0) {
+                            sourceNodes.add(node);
+                        } else {
+                            visitInputs(node);
+                        }
+                    }
+                };
+        roots.forEach(n -> n.accept(visitor));
+        return sourceNodes;
+    }
+
     private static class ConflictCausedByExchangeChecker
             extends AbstractExecNodeExactlyOnceVisitor {
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index 9fb292bd960..87df2a677d8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Collections;
+
 /**
  * Stream {@link ExecNode} to read data from an external source defined by a {@link
  * ScanTableSource}.
@@ -71,7 +73,14 @@ public class StreamExecTableSourceScan extends CommonExecTableSourceScan
             @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE) DynamicTableSourceSpec tableSourceSpec,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
             @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
-        super(id, context, persistedConfig, tableSourceSpec, outputType, description);
+        super(
+                id,
+                context,
+                persistedConfig,
+                tableSourceSpec,
+                Collections.emptyList(),
+                outputType,
+                description);
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 83004ec0f74..00f901d30e5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.operations.{ModifyOperation, Operation}
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode
-import org.apache.flink.table.planner.plan.nodes.exec.processor.{DeadlockBreakupProcessor, ExecNodeGraphProcessor, ForwardHashExchangeProcessor, MultipleInputNodeCreationProcessor}
+import org.apache.flink.table.planner.plan.nodes.exec.processor.{DeadlockBreakupProcessor, DynamicFilteringDependencyProcessor, ExecNodeGraphProcessor, ForwardHashExchangeProcessor, MultipleInputNodeCreationProcessor, ResetTransformationProcessor}
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodePlanDumper
 import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer}
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
@@ -42,6 +42,7 @@ import org.apache.calcite.sql.SqlExplainLevel
 import java.util
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable
 
 class BatchPlanner(
     executor: Executor,
@@ -77,6 +78,10 @@ class BatchPlanner(
       processors.add(new MultipleInputNodeCreationProcessor(false))
     }
     processors.add(new ForwardHashExchangeProcessor)
+    if (getTableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+      processors.add(new DynamicFilteringDependencyProcessor)
+      processors.add(new ResetTransformationProcessor)
+    }
     processors
   }
 
@@ -92,13 +97,13 @@ class BatchPlanner(
             "This is a bug and should not happen. Please file an issue.")
     }
     afterTranslation()
-    transformations
+    transformations ++ planner.extraTransformations
   }
 
   override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
     val (sinkRelNodes, optimizedRelNodes, execGraph, streamGraph) = getExplainGraphs(operations)
 
-    val sb = new StringBuilder
+    val sb = new mutable.StringBuilder
     sb.append("== Abstract Syntax Tree ==")
     sb.append(System.lineSeparator)
     sinkRelNodes.foreach {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index e2ef76b26eb..d0ca7bba606 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -104,6 +104,9 @@ abstract class PlannerBase(
   private var parser: Parser = _
   private var extendedOperationExecutor: ExtendedOperationExecutor = _
   private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
+  // the transformations generated in translateToPlan method, they are not connected
+  // with sink transformations but also are needed in the final graph.
+  private[flink] val extraTransformations = new util.ArrayList[Transformation[_]]()
 
   @VisibleForTesting
   private[flink] val plannerContext: PlannerContext =
@@ -358,6 +361,12 @@ abstract class PlannerBase(
    */
   protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
 
+  def addExtraTransformation(transformation: Transformation[_]): Unit = {
+    if (!extraTransformations.contains(transformation)) {
+      extraTransformations.add(transformation)
+    }
+  }
+
   private def getTableSink(
       contextResolvedTable: ContextResolvedTable,
       dynamicOptions: JMap[String, String]): Option[(ResolvedCatalogTable, Any)] = {
@@ -480,6 +489,7 @@ abstract class PlannerBase(
 
     // Clean caches that might have filled up during optimization
     CompileUtils.cleanUp()
+    extraTransformations.clear()
   }
 
   /** Returns all the graphs required to execute EXPLAIN */
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 739d85517b8..2431d9c45e8 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -46,6 +46,8 @@ import org.apache.calcite.sql.SqlExplainLevel
 import java.io.{File, IOException}
 import java.util
 
+import scala.collection.mutable
+
 class StreamPlanner(
     executor: Executor,
     tableConfig: TableConfig,
@@ -87,13 +89,13 @@ class StreamPlanner(
             "This is a bug and should not happen. Please file an issue.")
     }
     afterTranslation()
-    transformations
+    transformations ++ planner.extraTransformations
   }
 
   override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
     val (sinkRelNodes, optimizedRelNodes, execGraph, streamGraph) = getExplainGraphs(operations)
 
-    val sb = new StringBuilder
+    val sb = new mutable.StringBuilder
     sb.append("== Abstract Syntax Tree ==")
     sb.append(System.lineSeparator)
     sinkRelNodes.foreach {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringDataCollector.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringDataCollector.scala
index 58892934ba3..1c4cf7147d4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringDataCollector.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringDataCollector.scala
@@ -17,7 +17,11 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecDynamicFilteringDataCollector
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
+import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
@@ -54,6 +58,11 @@ class BatchPhysicalDynamicFilteringDataCollector(
   }
 
   override def translateToExecNode(): ExecNode[_] = {
-    throw new UnsupportedOperationException()
+    new BatchExecDynamicFilteringDataCollector(
+      JavaScalaConversionUtil.toJava(dynamicFilteringFieldIndices.map(i => Integer.valueOf(i))),
+      unwrapTableConfig(this),
+      InputProperty.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription)
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
index ca830616677..90fbc030845 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec
 import org.apache.flink.table.planner.plan.schema.TableSourceTable
@@ -88,6 +88,7 @@ class BatchPhysicalDynamicFilteringTableSourceScan(
     new BatchExecTableSourceScan(
       unwrapTableConfig(this),
       tableSourceSpec,
+      InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
   }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSource.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSource.java
new file mode 100644
index 00000000000..0622169b8cc
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSource.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.enumerator.DynamicFilteringValuesSourceEnumerator;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Source} implementation that reads data from a partitioned list.
+ *
+ * <p>This source is useful for dynamic filtering testing.
+ */
+public class DynamicFilteringValuesSource
+        implements Source<RowData, ValuesSourcePartitionSplit, NoOpEnumState> {
+
+    private final TypeSerializer<RowData> serializer;
+    private Map<Map<String, String>, byte[]> serializedElements;
+    private Map<Map<String, String>, Integer> counts;
+    private final List<String> dynamicFilteringFields;
+
+    public DynamicFilteringValuesSource(
+            Map<Map<String, String>, Collection<RowData>> elements,
+            TypeSerializer<RowData> serializer,
+            List<String> dynamicFilteringFields) {
+        this.serializer = serializer;
+        this.dynamicFilteringFields = dynamicFilteringFields;
+        serializeElements(serializer, elements);
+    }
+
+    private void serializeElements(
+            TypeSerializer<RowData> serializer,
+            Map<Map<String, String>, Collection<RowData>> elements) {
+        Preconditions.checkState(serializer != null, "serializer not set");
+
+        serializedElements = new HashMap<>();
+        counts = new HashMap<>();
+        for (Map<String, String> partition : elements.keySet()) {
+            Collection<RowData> collection = elements.get(partition);
+            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos)) {
+                for (RowData e : collection) {
+                    serializer.serialize(e, wrapper);
+                }
+                byte[] value = baos.toByteArray();
+                serializedElements.put(partition, value);
+            } catch (Exception e) {
+                throw new TableException(
+                        "Serializing the source elements failed: " + e.getMessage(), e);
+            }
+            counts.put(partition, collection.size());
+        }
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, ValuesSourcePartitionSplit> createReader(
+            SourceReaderContext readerContext) throws Exception {
+        return new DynamicFilteringValuesSourceReader(
+                serializedElements, counts, serializer, readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> createEnumerator(
+            SplitEnumeratorContext<ValuesSourcePartitionSplit> context) throws Exception {
+        List<ValuesSourcePartitionSplit> splits =
+                serializedElements.keySet().stream()
+                        .map(ValuesSourcePartitionSplit::new)
+                        .collect(Collectors.toList());
+        return new DynamicFilteringValuesSourceEnumerator(context, splits, dynamicFilteringFields);
+    }
+
+    @Override
+    public SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> restoreEnumerator(
+            SplitEnumeratorContext<ValuesSourcePartitionSplit> context, NoOpEnumState checkpoint) {
+        throw new UnsupportedOperationException("Unsupported now.");
+    }
+
+    @Override
+    public SimpleVersionedSerializer<ValuesSourcePartitionSplit> getSplitSerializer() {
+        return new ValuesSourcePartitionSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
+        return new NoOpEnumStateSerializer();
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSourceReader.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSourceReader.java
new file mode 100644
index 00000000000..71e0dd05ee4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSourceReader.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link SourceReader} implementation that reads data from a list. */
+public class DynamicFilteringValuesSourceReader
+        implements SourceReader<RowData, ValuesSourcePartitionSplit> {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringValuesSourceReader.class);
+
+    /** The context for this reader, to communicate with the enumerator. */
+    private final SourceReaderContext context;
+
+    /** The availability future. This reader is available as soon as a split is assigned. */
+    private CompletableFuture<Void> availability;
+
+    private final TypeSerializer<RowData> serializer;
+    private final Map<Map<String, String>, byte[]> serializedElements;
+    private final Map<Map<String, String>, Integer> counts;
+    private final Queue<ValuesSourcePartitionSplit> remainingSplits;
+
+    private transient ValuesSourcePartitionSplit currentSplit;
+    private transient Iterator<RowData> iterator;
+    private transient boolean noMoreSplits;
+
+    public DynamicFilteringValuesSourceReader(
+            Map<Map<String, String>, byte[]> serializedElements,
+            Map<Map<String, String>, Integer> counts,
+            TypeSerializer<RowData> serializer,
+            SourceReaderContext context) {
+        this.serializedElements = checkNotNull(serializedElements);
+        this.counts = checkNotNull(counts);
+        this.serializer = serializer;
+        this.context = checkNotNull(context);
+        this.availability = new CompletableFuture<>();
+        this.remainingSplits = new ArrayDeque<>();
+    }
+
+    @Override
+    public void start() {
+        // request a split if we don't have one
+        if (remainingSplits.isEmpty()) {
+            context.sendSplitRequest();
+        }
+    }
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<RowData> output) {
+        if (iterator != null) {
+            if (iterator.hasNext()) {
+                output.collect(iterator.next());
+                return InputStatus.MORE_AVAILABLE;
+            } else {
+                finishSplit();
+            }
+        }
+
+        return tryMoveToNextSplit();
+    }
+
+    private void finishSplit() {
+        iterator = null;
+        currentSplit = null;
+
+        // request another split if no other is left
+        // we do this only here in the finishSplit part to avoid requesting a split
+        // whenever the reader is polled and doesn't currently have a split
+        if (remainingSplits.isEmpty() && !noMoreSplits) {
+            context.sendSplitRequest();
+        }
+    }
+
+    private InputStatus tryMoveToNextSplit() {
+        currentSplit = remainingSplits.poll();
+        if (currentSplit != null) {
+            Map<String, String> partition = currentSplit.getPartition();
+            List<RowData> list =
+                    deserialize(serializedElements.get(partition), counts.get(partition));
+            iterator = list.iterator();
+            return InputStatus.MORE_AVAILABLE;
+        } else if (noMoreSplits) {
+            return InputStatus.END_OF_INPUT;
+        } else {
+            // ensure we are not called in a loop by resetting the availability future
+            if (availability.isDone()) {
+                availability = new CompletableFuture<>();
+            }
+
+            return InputStatus.NOTHING_AVAILABLE;
+        }
+    }
+
+    private List<RowData> deserialize(byte[] data, int count) {
+        List<RowData> list = new ArrayList<>();
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(data)) {
+            final DataInputView input = new DataInputViewStreamWrapper(bais);
+            for (int i = 0; i < count; ++i) {
+                RowData element = serializer.deserialize(input);
+                list.add(element);
+            }
+        } catch (Exception e) {
+            throw new TableException(
+                    "Failed to deserialize an element from the source. "
+                            + "If you are using user-defined serialization (Value and Writable types), check the "
+                            + "serialization functions.\nSerializer is "
+                            + serializer,
+                    e);
+        }
+        return list;
+    }
+
+    @Override
+    public List<ValuesSourcePartitionSplit> snapshotState(long checkpointId) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        return availability;
+    }
+
+    @Override
+    public void addSplits(List<ValuesSourcePartitionSplit> splits) {
+        remainingSplits.addAll(splits);
+        // set availability so that pollNext is actually called
+        availability.complete(null);
+    }
+
+    @Override
+    public void notifyNoMoreSplits() {
+        noMoreSplits = true;
+        // set availability so that pollNext is actually called
+        availability.complete(null);
+    }
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        LOG.info("checkpoint {} finished.", checkpointId);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/enumerator/DynamicFilteringValuesSourceEnumerator.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/enumerator/DynamicFilteringValuesSourceEnumerator.java
new file mode 100644
index 00000000000..83b09b0ffcc
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/enumerator/DynamicFilteringValuesSourceEnumerator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source.enumerator;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A SplitEnumerator implementation for dynamic filtering source. */
+public class DynamicFilteringValuesSourceEnumerator
+        implements SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringValuesSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<ValuesSourcePartitionSplit> context;
+    private final List<ValuesSourcePartitionSplit> allSplits;
+    private final List<String> dynamicFilteringFields;
+    private transient boolean receivedDynamicFilteringEvent;
+    private transient List<ValuesSourcePartitionSplit> remainingSplits;
+
+    public DynamicFilteringValuesSourceEnumerator(
+            SplitEnumeratorContext<ValuesSourcePartitionSplit> context,
+            List<ValuesSourcePartitionSplit> allSplits,
+            List<String> dynamicFilteringFields) {
+        this.context = context;
+        this.allSplits = allSplits;
+        this.dynamicFilteringFields = dynamicFilteringFields;
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+        if (!receivedDynamicFilteringEvent) {
+            throw new IllegalStateException("DynamicFilteringEvent has not receive");
+        }
+        if (remainingSplits.isEmpty()) {
+            context.signalNoMoreSplits(subtaskId);
+            LOG.info("No more splits available for subtask {}", subtaskId);
+        } else {
+            ValuesSourcePartitionSplit split = remainingSplits.remove(0);
+            LOG.debug("Assigned split to subtask {} : {}", subtaskId, split);
+            context.assignSplit(split, subtaskId);
+        }
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof DynamicFilteringEvent) {
+            LOG.warn("Received DynamicFilteringEvent: {}", subtaskId);
+            receivedDynamicFilteringEvent = true;
+            DynamicFilteringData dynamicFilteringData =
+                    ((DynamicFilteringEvent) sourceEvent).getData();
+            assignPartitions(dynamicFilteringData);
+        } else {
+            LOG.error("Received unrecognized event: {}", sourceEvent);
+        }
+    }
+
+    private void assignPartitions(DynamicFilteringData data) {
+        if (data.isFiltering()) {
+            remainingSplits = new ArrayList<>();
+            for (ValuesSourcePartitionSplit split : allSplits) {
+                List<String> values =
+                        dynamicFilteringFields.stream()
+                                .map(k -> split.getPartition().get(k))
+                                .collect(Collectors.toList());
+                LOG.info("values: " + values);
+                if (data.contains(generateRowData(values, data.getRowType()))) {
+                    remainingSplits.add(split);
+                }
+            }
+        } else {
+            remainingSplits = new ArrayList<>(allSplits);
+        }
+        LOG.info("remainingSplits: " + remainingSplits);
+    }
+
+    private GenericRowData generateRowData(List<String> partitionValues, RowType rowType) {
+        Preconditions.checkArgument(partitionValues.size() == rowType.getFieldCount());
+        Object[] values = new Object[partitionValues.size()];
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            switch (rowType.getTypeAt(i).getTypeRoot()) {
+                case VARCHAR:
+                    values[i] = partitionValues.get(i);
+                    break;
+                case INTEGER:
+                    values[i] = Integer.valueOf(partitionValues.get(i));
+                    break;
+                case BIGINT:
+                    values[i] = Long.valueOf(partitionValues.get(i));
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            rowType.getTypeAt(i).getTypeRoot() + " is not supported.");
+            }
+        }
+        return GenericRowData.of(values);
+    }
+
+    @Override
+    public void addSplitsBack(List<ValuesSourcePartitionSplit> splits, int subtaskId) {
+        remainingSplits.addAll(splits);
+    }
+
+    @Override
+    public void addReader(int subtaskId) {}
+
+    @Override
+    public NoOpEnumState snapshotState(long checkpointId) throws Exception {
+        return new NoOpEnumState();
+    }
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplit.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplit.java
new file mode 100644
index 00000000000..a8eab5d6241
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplit.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source.split;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.source.DynamicFilteringValuesSource;
+
+import java.util.Map;
+
+/** The split of the {@link DynamicFilteringValuesSource}. */
+public class ValuesSourcePartitionSplit implements SourceSplit {
+
+    private final Map<String, String> partition;
+
+    public ValuesSourcePartitionSplit(Map<String, String> partition) {
+        this.partition = partition;
+    }
+
+    public Map<String, String> getPartition() {
+        return partition;
+    }
+
+    @Override
+    public String splitId() {
+        return partition.toString();
+    }
+
+    @Override
+    public String toString() {
+        return "ValuesSourcePartitionSplit{" + "partition='" + partition + '\'' + '}';
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplitSerializer.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplitSerializer.java
new file mode 100644
index 00000000000..416c5268e88
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplitSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source.split;
+
+import org.apache.flink.connector.source.DynamicFilteringValuesSource;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/** The split serializer for the {@link DynamicFilteringValuesSource}. */
+public class ValuesSourcePartitionSplitSerializer
+        implements SimpleVersionedSerializer<ValuesSourcePartitionSplit> {
+
+    @Override
+    public int getVersion() {
+        return 0;
+    }
+
+    @Override
+    public byte[] serialize(ValuesSourcePartitionSplit split) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            Map<String, String> partition = split.getPartition();
+            out.writeUTF(split.splitId());
+            out.writeInt(partition.size());
+            for (Map.Entry<String, String> entry : partition.entrySet()) {
+                out.writeUTF(entry.getKey());
+                out.writeUTF(entry.getValue());
+            }
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public ValuesSourcePartitionSplit deserialize(int version, byte[] serialized)
+            throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                DataInputStream in = new DataInputStream(bais)) {
+            Map<String, String> partition = new HashMap<>();
+            String splitId = in.readUTF();
+            int size = in.readInt();
+            for (int i = 0; i < size; ++i) {
+                String key = in.readUTF();
+                String value = in.readUTF();
+                partition.put(key, value);
+            }
+            ValuesSourcePartitionSplit split = new ValuesSourcePartitionSplit(partition);
+            Preconditions.checkArgument(split.splitId().equals(splitId));
+            return split;
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 7960f071e50..b3f545fe718 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.source.DynamicFilteringValuesSource;
 import org.apache.flink.connector.source.ValuesSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -743,7 +744,8 @@ public final class TestValuesTableFactory
                     SupportsLimitPushDown,
                     SupportsPartitionPushDown,
                     SupportsReadingMetadata,
-                    SupportsAggregatePushDown {
+                    SupportsAggregatePushDown,
+                    SupportsDynamicFiltering {
 
         protected DataType producedDataType;
         protected final ChangelogMode changelogMode;
@@ -765,6 +767,7 @@ public final class TestValuesTableFactory
 
         private @Nullable int[] groupingSet;
         private List<AggregateExpression> aggregateExpressions;
+        private List<String> acceptedPartitionFilterFields;
 
         private TestValuesScanTableSourceWithoutProjectionPushDown(
                 DataType producedDataType,
@@ -817,11 +820,11 @@ public final class TestValuesTableFactory
                     runtimeProviderContext.createDataStructureConverter(producedDataType);
             converter.open(
                     RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
-            Collection<RowData> values = convertToRowData(converter);
 
             switch (runtimeSource) {
                 case "SourceFunction":
                     try {
+                        Collection<RowData> values = convertToRowData(converter);
                         final SourceFunction<RowData> sourceFunction;
                         if (failingSource) {
                             sourceFunction =
@@ -838,14 +841,16 @@ public final class TestValuesTableFactory
                     checkArgument(
                             !failingSource,
                             "Values InputFormat Source doesn't support as failing source.");
+                    Collection<RowData> values = convertToRowData(converter);
                     return InputFormatProvider.of(new CollectionInputFormat<>(values, serializer));
                 case "DataStream":
                     checkArgument(
                             !failingSource,
                             "Values DataStream Source doesn't support as failing source.");
                     try {
+                        Collection<RowData> values2 = convertToRowData(converter);
                         FromElementsFunction<RowData> function =
-                                new FromElementsFunction<>(serializer, values);
+                                new FromElementsFunction<>(serializer, values2);
                         return new DataStreamScanProvider() {
                             @Override
                             public DataStream<RowData> produceDataStream(
@@ -870,7 +875,18 @@ public final class TestValuesTableFactory
                 case "NewSource":
                     checkArgument(
                             !failingSource, "Values Source doesn't support as failing new source.");
-                    return SourceProvider.of(new ValuesSource(values, serializer));
+                    if (acceptedPartitionFilterFields == null
+                            || acceptedPartitionFilterFields.isEmpty()) {
+                        Collection<RowData> values2 = convertToRowData(converter);
+                        return SourceProvider.of(new ValuesSource(values2, serializer));
+                    } else {
+                        Map<Map<String, String>, Collection<RowData>> partitionValues =
+                                convertToPartitionedRowData(converter);
+                        DynamicFilteringValuesSource source =
+                                new DynamicFilteringValuesSource(
+                                        partitionValues, serializer, acceptedPartitionFilterFields);
+                        return SourceProvider.of(source);
+                    }
                 default:
                     throw new IllegalArgumentException(
                             "Unsupported runtime source class: " + runtimeSource);
@@ -931,8 +947,21 @@ public final class TestValuesTableFactory
 
         protected Collection<RowData> convertToRowData(DataStructureConverter converter) {
             List<RowData> result = new ArrayList<>();
+            for (Collection<RowData> rowData : convertToPartitionedRowData(converter).values()) {
+                result.addAll(rowData);
+            }
+            return result;
+        }
+
+        protected Map<Map<String, String>, Collection<RowData>> convertToPartitionedRowData(
+                DataStructureConverter converter) {
+            Map<Map<String, String>, Collection<RowData>> result = new HashMap<>();
+            int size = 0;
             int numSkipped = 0;
             for (Map<String, String> partition : data.keySet()) {
+                List<RowData> partitionResult = new ArrayList<>();
+                result.put(partition, partitionResult);
+
                 Collection<Row> rowsInPartition = data.get(partition);
 
                 // handle element skipping
@@ -968,11 +997,12 @@ public final class TestValuesTableFactory
                     final RowData rowData = (RowData) converter.toInternal(row);
                     if (rowData != null) {
                         rowData.setRowKind(row.getKind());
-                        result.add(rowData);
+                        partitionResult.add(rowData);
+                        size++;
                     }
 
                     // handle limit. No aggregates will be pushed down when there is a limit.
-                    if (result.size() >= limit) {
+                    if (size >= limit) {
                         return result;
                     }
                 }
@@ -1214,12 +1244,31 @@ public final class TestValuesTableFactory
             projectedMetadataFields =
                     remainingMetadataKeys.stream().mapToInt(allMetadataKeys::indexOf).toArray();
         }
+
+        @Override
+        public List<String> applyDynamicFiltering(List<String> candidateFilterFields) {
+            if (dynamicFilteringFields != null && dynamicFilteringFields.size() != 0) {
+                checkArgument(!candidateFilterFields.isEmpty());
+                acceptedPartitionFilterFields = new ArrayList<>();
+                for (String field : candidateFilterFields) {
+                    if (dynamicFilteringFields.contains(field)) {
+                        acceptedPartitionFilterFields.add(field);
+                    }
+                }
+
+                return new ArrayList<>(acceptedPartitionFilterFields);
+            } else {
+                throw new UnsupportedOperationException(
+                        "Should adding dynamic filtering fields by adding factor"
+                                + " in with like: 'dynamic-filtering-fields' = 'a;b'.");
+            }
+        }
     }
 
     /** Values {@link ScanTableSource} for testing that supports projection push down. */
     private static class TestValuesScanTableSource
             extends TestValuesScanTableSourceWithoutProjectionPushDown
-            implements SupportsProjectionPushDown, SupportsDynamicFiltering {
+            implements SupportsProjectionPushDown {
 
         private TestValuesScanTableSource(
                 DataType producedDataType,
@@ -1290,25 +1339,6 @@ public final class TestValuesTableFactory
             // we can't immediately project the data here,
             // because ReadingMetadataSpec may bring new fields
         }
-
-        @Override
-        public List<String> applyDynamicFiltering(List<String> candidateFilterFields) {
-            if (dynamicFilteringFields != null && dynamicFilteringFields.size() != 0) {
-                checkArgument(!candidateFilterFields.isEmpty());
-                List<String> acceptedPartitionFields = new ArrayList<>();
-                for (String field : candidateFilterFields) {
-                    if (dynamicFilteringFields.contains(field)) {
-                        acceptedPartitionFields.add(field);
-                    }
-                }
-
-                return acceptedPartitionFields;
-            } else {
-                throw new UnsupportedOperationException(
-                        "Should adding dynamic filtering fields by adding factor"
-                                + " in with like: 'dynamic-filtering-fields' = 'a;b'.");
-            }
-        }
     }
 
     /** Values {@link ScanTableSource} for testing that supports watermark push down. */
@@ -1547,25 +1577,6 @@ public final class TestValuesTableFactory
                     readableMetadata,
                     projectedMetadataFields);
         }
-
-        @Override
-        public List<String> applyDynamicFiltering(List<String> candidateFilterFields) {
-            if (dynamicFilteringFields != null && dynamicFilteringFields.size() != 0) {
-                checkArgument(!candidateFilterFields.isEmpty());
-                List<String> acceptedPartitionFields = new ArrayList<>();
-                for (String field : candidateFilterFields) {
-                    if (dynamicFilteringFields.contains(field)) {
-                        acceptedPartitionFields.add(field);
-                    }
-                }
-
-                return acceptedPartitionFields;
-            } else {
-                throw new UnsupportedOperationException(
-                        "Should adding dynamic filtering fields by adding factor"
-                                + " in with like: 'dynamic-filtering-fields' = 'a;b'.");
-            }
-        }
     }
 
     /** A mocked {@link LookupTableSource} for validation test. */
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.java
new file mode 100644
index 00000000000..d76f3c3908e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/** Plan test for dynamic filtering. */
+public class DynamicFilteringTest extends TableTestBase {
+
+    private BatchTableTestUtil util;
+
+    @Before
+    public void before() {
+        util = batchTestUtil(TableConfig.getDefault());
+        util.tableEnv()
+                .getConfig()
+                .getConfiguration()
+                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 10);
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE fact1 (\n"
+                                + "  a1 bigint,\n"
+                                + "  b1 int,\n"
+                                + "  c1 varchar,\n"
+                                + "  p1 varchar\n"
+                                + ") PARTITIONED BY(p1) WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'runtime-source' = 'NewSource',\n"
+                                + " 'partition-list' = 'p1:1;p1:2;p1:3',\n"
+                                + " 'dynamic-filtering-fields' = 'p1;b1',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE fact2 (\n"
+                                + "  a2 bigint,\n"
+                                + "  b2 int,\n"
+                                + "  c2 varchar,\n"
+                                + "  p2 varchar\n"
+                                + ") PARTITIONED BY(p2) WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'runtime-source' = 'NewSource',\n"
+                                + " 'partition-list' = 'p1:1;p1:2;p1:3',\n"
+                                + " 'dynamic-filtering-fields' = 'p2',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE dim (\n"
+                                + "  x BIGINT,\n"
+                                + "  y BIGINT,\n"
+                                + "  z VARCHAR,\n"
+                                + "  p VARCHAR\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+    }
+
+    @Test
+    public void testLegacySource() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE legacy_source (\n"
+                                + "  a1 BIGINT,\n"
+                                + "  b1 BIGINT,\n"
+                                + "  c1 VARCHAR,\n"
+                                + "  d1 BIGINT,\n"
+                                + "  p1 VARCHAR\n"
+                                + ") PARTITIONED BY (p1)\n"
+                                + "  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'runtime-source' = 'SourceFunction',\n"
+                                + " 'partition-list' = 'p1:1;p1:2;p1:3',\n"
+                                + " 'dynamic-filtering-fields' = 'p1',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+        util.verifyExplain(
+                "SELECT * FROM legacy_source, dim WHERE p1 = p AND x > 10",
+                JavaScalaConversionUtil.toScala(
+                        Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+    }
+
+    @Test
+    public void testSimpleDynamicFiltering() {
+        // the execution plan contains 'Placeholder-Filter' operator
+        util.verifyExplain(
+                "SELECT * FROM fact1, dim WHERE p1 = p AND x > 10",
+                JavaScalaConversionUtil.toScala(
+                        Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+    }
+
+    @Test
+    public void testDynamicFilteringWithMultipleInput() {
+        // the execution plan does not contain 'Placeholder-Filter' operator
+        util.verifyExplain(
+                "SELECT * FROM fact1, dim, fact2 WHERE p1 = p and p1 = p2 AND x > 10",
+                JavaScalaConversionUtil.toScala(
+                        Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+    }
+
+    @Test
+    public void testDuplicateFactTables() {
+        // the fact tables can not be reused
+        util.verifyExplain(
+                "SELECT * FROM (SELECT * FROM fact1, dim WHERE p1 = p AND x > 10) t1 JOIN fact1 t2 ON t1.y = t2.b1",
+                JavaScalaConversionUtil.toScala(
+                        Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+    }
+
+    @Test
+    public void testReuseDimSide() {
+        // dynamic filtering collector will be reused for both fact tables
+        util.verifyExplain(
+                "SELECT * FROM fact1, dim WHERE p1 = p AND x > 10 "
+                        + "UNION ALL "
+                        + "SELECT * FROM fact2, dim WHERE p2 = p AND x > 10",
+                JavaScalaConversionUtil.toScala(
+                        Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+    }
+
+    @Test
+    public void testDynamicFilteringWithStaticPartitionPruning() {
+        util.verifyExplain(
+                "SELECT * FROM fact1, dim WHERE p1 = p AND x > 10 and p1 > 1",
+                JavaScalaConversionUtil.toScala(
+                        Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
index e79899d496c..cf76f5ba927 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecDynamicFilteringDataCollector;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
+import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.junit.Test;
@@ -147,4 +151,54 @@ public class InputPriorityConflictResolverTest {
         checkExchange.accept(input0);
         checkExchange.accept(input1);
     }
+
+    @Test
+    public void testWithDynamicFilteringPlan() {
+        // no conflicts for dpp pattern
+        // 2 --------------------------------------(P1)--- 1 --(P0)--> 0
+        //   \                                            /
+        //   DynamicFilteringDataCollector               /
+        //     \                                        /
+        //    DynamicFilteringTableSourceScan --(P0) --/
+        TestingBatchExecNode[] nodes = new TestingBatchExecNode[3];
+        for (int i = 0; i < nodes.length; i++) {
+            nodes[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
+        }
+
+        BatchExecTableSourceScan scan =
+                new BatchExecTableSourceScan(
+                        new Configuration(),
+                        new DynamicTableSourceSpec(null, null),
+                        InputProperty.DEFAULT,
+                        RowType.of(new IntType(), new IntType(), new IntType()),
+                        "DynamicFilteringTableSourceScan");
+        BatchExecDynamicFilteringDataCollector collector =
+                new BatchExecDynamicFilteringDataCollector(
+                        Collections.singletonList(1),
+                        new Configuration(),
+                        InputProperty.DEFAULT,
+                        RowType.of(new IntType()),
+                        "DynamicFilteringDataCollector");
+
+        nodes[0].addInput(nodes[1], InputProperty.builder().priority(0).build());
+        nodes[1].addInput(nodes[2], InputProperty.builder().priority(1).build());
+        nodes[1].addInput(scan, InputProperty.builder().priority(0).build());
+        ExecEdge collect2Scan = ExecEdge.builder().source(collector).target(scan).build();
+        scan.setInputEdges(Collections.singletonList(collect2Scan));
+        ExecEdge toCollector = ExecEdge.builder().source(nodes[2]).target(collector).build();
+        collector.setInputEdges(Collections.singletonList(toCollector));
+
+        InputPriorityConflictResolver resolver =
+                new InputPriorityConflictResolver(
+                        Collections.singletonList(nodes[1]),
+                        InputProperty.DamBehavior.END_INPUT,
+                        StreamExchangeMode.BATCH,
+                        new Configuration());
+        resolver.detectAndResolve();
+
+        ExecNode<?> input0 = nodes[1].getInputNodes().get(0);
+        ExecNode<?> input1 = nodes[1].getInputNodes().get(1);
+        assertThat(input0).isSameAs(nodes[2]);
+        assertThat(input1).isSameAs(scan);
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DynamicFilteringITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DynamicFilteringITCase.java
new file mode 100644
index 00000000000..acbc4937f58
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DynamicFilteringITCase.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** IT test for dynamic filtering. */
+public class DynamicFilteringITCase extends BatchTestBase {
+
+    private TableEnvironment tEnv;
+    private Catalog catalog;
+
+    @BeforeEach
+    @Override
+    public void before() throws Exception {
+        super.before();
+        tEnv = tEnv();
+        catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED, true);
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);
+
+        String dataId1 = TestValuesTableFactory.registerData(TestData.data7());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE dim (\n"
+                                + "  x INT,\n"
+                                + "  y INT,\n"
+                                + "  z BIGINT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'data-id' = '%s',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")",
+                        dataId1));
+
+        String dataId2 = TestValuesTableFactory.registerData(TestData.data5());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE fact1 (\n"
+                                + "  `a` INT,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` INT,\n"
+                                + "  `d` VARCHAR,\n"
+                                + "  `e` BIGINT\n"
+                                + ") partitioned by (a)\n"
+                                + " WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'runtime-source' = 'NewSource',\n"
+                                + "  'partition-list' = 'a:1;a:2;a:3;a:4;a:5;',\n"
+                                + "  'dynamic-filtering-fields' = 'a',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'disable-lookup' = 'true',\n"
+                                + "  'bounded' = 'true'\n"
+                                + ")",
+                        dataId2));
+
+        String dataId3 = TestValuesTableFactory.registerData(TestData.data5());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE fact2 (\n"
+                                + "  `a` INT,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` INT,\n"
+                                + "  `d` VARCHAR,\n"
+                                + "  `e` BIGINT\n"
+                                + ") partitioned by (e, a)\n"
+                                + " WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'runtime-source' = 'NewSource',\n"
+                                + "  'partition-list' = 'e:1,a:1;e:1,a:2;e:1,a:4;e:1,a:5;e:2,a:2;e:2,a:3;e:2,a:4;e:2,a:5;e:3,a:3;e:3,a:5;',\n"
+                                + "  'dynamic-filtering-fields' = 'a;e',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'disable-lookup' = 'true',\n"
+                                + "  'bounded' = 'true'\n"
+                                + ")",
+                        dataId3));
+    }
+
+    @Test
+    public void testSimpleDynamicFiltering() {
+        checkResult(
+                "SELECT * FROM fact1, dim WHERE x = a AND z = 2",
+                JavaScalaConversionUtil.toScala(
+                        Arrays.asList(
+                                Row.of(2, 2, 1, "Hallo Welt", 2, 2, 2, 2),
+                                Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 2, 2),
+                                Row.of(3, 4, 3, "Hallo Welt wie gehts?", 2, 3, 3, 2),
+                                Row.of(3, 5, 4, "ABC", 2, 3, 3, 2),
+                                Row.of(3, 6, 5, "BCD", 3, 3, 3, 2),
+                                Row.of(4, 10, 9, "FGH", 2, 4, 5, 2),
+                                Row.of(4, 10, 9, "FGH", 2, 4, 7, 2),
+                                Row.of(4, 7, 6, "CDE", 2, 4, 5, 2),
+                                Row.of(4, 7, 6, "CDE", 2, 4, 7, 2),
+                                Row.of(4, 8, 7, "DEF", 1, 4, 5, 2),
+                                Row.of(4, 8, 7, "DEF", 1, 4, 7, 2),
+                                Row.of(4, 9, 8, "EFG", 1, 4, 5, 2),
+                                Row.of(4, 9, 8, "EFG", 1, 4, 7, 2),
+                                Row.of(5, 11, 10, "GHI", 1, 5, 9, 2),
+                                Row.of(5, 12, 11, "HIJ", 3, 5, 9, 2),
+                                Row.of(5, 13, 12, "IJK", 3, 5, 9, 2),
+                                Row.of(5, 14, 13, "JKL", 2, 5, 9, 2),
+                                Row.of(5, 15, 14, "KLM", 2, 5, 9, 2))),
+                false);
+    }
+
+    @Test
+    public void testDynamicFilteringChainWithMultipleInput() throws Exception {
+        String dataId1 = TestValuesTableFactory.registerData(TestData.data7());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE dim2 (\n"
+                                + "  x INT,\n"
+                                + "  y INT,\n"
+                                + "  z BIGINT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'data-id' = '%s',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")",
+                        dataId1));
+
+        catalog.alterTableStatistics(
+                new ObjectPath(tEnv.getCurrentDatabase(), "dim"),
+                new CatalogTableStatistics(1, 1, 1, 1),
+                false);
+        catalog.alterTableStatistics(
+                new ObjectPath(tEnv.getCurrentDatabase(), "dim2"),
+                new CatalogTableStatistics(100, 1, 1, 1),
+                false);
+        checkResult(
+                "SELECT * FROM fact1, dim, dim2 WHERE dim.x = fact1.a and dim2.y = fact1.a AND dim.z = 1",
+                JavaScalaConversionUtil.toScala(
+                        Arrays.asList(
+                                Row.of(1, 1, 0, "Hallo", 1, 1, 0, 1, 2, 1, 1),
+                                Row.of(2, 2, 1, "Hallo Welt", 2, 2, 1, 1, 2, 2, 2),
+                                Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 1, 1, 2, 2, 2),
+                                Row.of(4, 10, 9, "FGH", 2, 4, 6, 1, 3, 4, 3),
+                                Row.of(4, 7, 6, "CDE", 2, 4, 6, 1, 3, 4, 3),
+                                Row.of(4, 8, 7, "DEF", 1, 4, 6, 1, 3, 4, 3),
+                                Row.of(4, 9, 8, "EFG", 1, 4, 6, 1, 3, 4, 3),
+                                Row.of(5, 11, 10, "GHI", 1, 5, 8, 1, 4, 5, 2),
+                                Row.of(5, 12, 11, "HIJ", 3, 5, 8, 1, 4, 5, 2),
+                                Row.of(5, 13, 12, "IJK", 3, 5, 8, 1, 4, 5, 2),
+                                Row.of(5, 14, 13, "JKL", 2, 5, 8, 1, 4, 5, 2),
+                                Row.of(5, 15, 14, "KLM", 2, 5, 8, 1, 4, 5, 2))),
+                false);
+    }
+
+    @Test
+    public void testDynamicFilteringCannotChainWithMultipleInput() {
+        checkResult(
+                "SELECT * FROM fact1, dim, fact2 WHERE x = fact1.a and fact2.a = fact1.a AND z = 1 and fact1.e = 2 and fact2.e = 1",
+                JavaScalaConversionUtil.toScala(
+                        Arrays.asList(
+                                Row.of(
+                                        2,
+                                        2,
+                                        1,
+                                        "Hallo Welt",
+                                        2,
+                                        2,
+                                        1,
+                                        1,
+                                        2,
+                                        3,
+                                        2,
+                                        "Hallo Welt wie",
+                                        1),
+                                Row.of(4, 10, 9, "FGH", 2, 4, 6, 1, 4, 8, 7, "DEF", 1),
+                                Row.of(4, 10, 9, "FGH", 2, 4, 6, 1, 4, 9, 8, "EFG", 1),
+                                Row.of(4, 7, 6, "CDE", 2, 4, 6, 1, 4, 8, 7, "DEF", 1),
+                                Row.of(4, 7, 6, "CDE", 2, 4, 6, 1, 4, 9, 8, "EFG", 1),
+                                Row.of(5, 14, 13, "JKL", 2, 5, 8, 1, 5, 11, 10, "GHI", 1),
+                                Row.of(5, 15, 14, "KLM", 2, 5, 8, 1, 5, 11, 10, "GHI", 1))),
+                false);
+    }
+
+    @Test
+    public void testReuseDimSide() {
+        checkResult(
+                "SELECT * FROM fact1, dim WHERE x = a AND z = 1 and b = 3"
+                        + "UNION ALL "
+                        + "SELECT * FROM fact2, dim WHERE x = a AND z = 1 and b = 2",
+                JavaScalaConversionUtil.toScala(
+                        Arrays.asList(
+                                Row.of(2, 2, 1, "Hallo Welt", 2, 2, 1, 1),
+                                Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 1, 1))),
+                false);
+    }
+
+    @Test
+    public void testDynamicFilteringWithStaticPartitionPruning() {
+        checkResult(
+                "SELECT * FROM fact2, dim WHERE x = a and e = z AND y < 5 and a = 3",
+                JavaScalaConversionUtil.toScala(
+                        Arrays.asList(
+                                Row.of(3, 4, 3, "Hallo Welt wie gehts?", 2, 3, 3, 2),
+                                Row.of(3, 5, 4, "ABC", 2, 3, 3, 2),
+                                Row.of(3, 6, 5, "BCD", 3, 3, 4, 3))),
+                false);
+    }
+
+    @Test
+    public void testMultiplePartitionKeysWithFullKey() {
+        checkResult(
+                "SELECT * FROM fact2, dim WHERE x = a AND z = e and y = 1",
+                JavaScalaConversionUtil.toScala(
+                        Collections.singletonList(Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 1, 1))),
+                false);
+    }
+
+    @Test
+    public void testMultiplePartitionKeysWithPartialKey() {
+        checkResult(
+                "SELECT * FROM fact2, dim WHERE z = e and y = 1",
+                JavaScalaConversionUtil.toScala(
+                        Arrays.asList(
+                                Row.of(1, 1, 0, "Hallo", 1, 2, 1, 1),
+                                Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 1, 1),
+                                Row.of(4, 8, 7, "DEF", 1, 2, 1, 1),
+                                Row.of(4, 9, 8, "EFG", 1, 2, 1, 1),
+                                Row.of(5, 11, 10, "GHI", 1, 2, 1, 1))),
+                false);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.xml
new file mode 100644
index 00000000000..75d522ca41a
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.xml
@@ -0,0 +1,713 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testDynamicFilteringWithStaticPartitionPruning">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
++- LogicalFilter(condition=[AND(=($3, $7), >($4, 10), >(CAST($3):BIGINT, 1))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+:  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1, partitions=[{p1=2}, {p1=3}]]], fields=[a1, b1, c1, p1])
+:     +- DynamicFilteringDataCollector(fields=[p])
+:        +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+:           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+   +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+      +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Optimized Execution Plan ==
+HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+:  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1, partitions=[{p1=2}, {p1=3}]]], fields=[a1, b1, c1, p1])
+:     +- DynamicFilteringDataCollector(fields=[p])
+:        +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+:           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+   +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: fact1[]",
+    "pact" : "Data Source",
+    "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1, partitions=[{p1=2}, {p1=3}]]], fields=[a1, b1, c1, p1])",
+    "parallelism" : 10
+  }, {
+    "id" : ,
+    "type" : "Source: dim[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "DynamicFilteringDataCollector[]",
+    "pact" : "Operator",
+    "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Order-Enforcer",
+    "pact" : "Operator",
+    "contents" : "Order-Enforcer",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashJoin[]",
+    "pact" : "Operator",
+    "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH[p]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[p1]",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDuplicateFactTables">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7], a10=[$8], b10=[$9], c10=[$10], p10=[$11])
++- LogicalJoin(condition=[=($5, $12)], joinType=[inner])
+   :- LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
+   :  +- LogicalFilter(condition=[AND(=($3, $7), >($4, 10))])
+   :     +- LogicalJoin(condition=[true], joinType=[inner])
+   :        :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+   :        +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+   +- LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], b10=[CAST($1):BIGINT])
+      +- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+
+== Optimized Physical Plan ==
+Calc(select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10])
++- HashJoin(joinType=[InnerJoin], where=[=(y, b100)], select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10, b100], build=[left])
+   :- Exchange(distribution=[hash[y]])
+   :  +- HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+   :     :- Exchange(distribution=[hash[p1]])
+   :     :  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+   :     :     +- DynamicFilteringDataCollector(fields=[p])
+   :     :        +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+   :     :           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+   :     +- Exchange(distribution=[hash[p]])
+   :        +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+   +- Exchange(distribution=[hash[b10]])
+      +- Calc(select=[a1, b1, c1, p1, CAST(b1 AS BIGINT) AS b10])
+         +- TableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+
+== Optimized Execution Plan ==
+Calc(select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10])
++- HashJoin(joinType=[InnerJoin], where=[(y = b100)], select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10, b100], build=[left])
+   :- Exchange(distribution=[hash[y]])
+   :  +- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+   :     :- Exchange(distribution=[hash[p1]])
+   :     :  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+   :     :     +- DynamicFilteringDataCollector(fields=[p])
+   :     :        +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+   :     :           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+   :     +- Exchange(distribution=[hash[p]])
+   :        +- Reused(reference_id=[1])
+   +- Exchange(distribution=[hash[b10]])
+      +- Calc(select=[a1, b1, c1, p1, CAST(b1 AS BIGINT) AS b10])
+         +- TableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: fact1[]",
+    "pact" : "Data Source",
+    "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+    "parallelism" : 10
+  }, {
+    "id" : ,
+    "type" : "Source: dim[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "DynamicFilteringDataCollector[]",
+    "pact" : "Operator",
+    "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Order-Enforcer",
+    "pact" : "Operator",
+    "contents" : "Order-Enforcer",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashJoin[]",
+    "pact" : "Operator",
+    "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH[p]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[p1]",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Source: fact1[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+    "parallelism" : 10
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a1, b1, c1, p1, CAST(b1 AS BIGINT) AS b10])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashJoin[]",
+    "pact" : "Operator",
+    "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(y = b100)], select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10, b100], build=[left])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH[y]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[b10]",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDynamicFilteringWithMultipleInput">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7], a2=[$8], b2=[$9], c2=[$10], p2=[$11])
++- LogicalFilter(condition=[AND(=($3, $7), =($3, $11), >($4, 10))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, fact2]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[InnerJoin], where=[=(p1, p2)], select=[a1, b1, c1, p1, x, y, z, p, a2, b2, c2, p2], build=[left])
+:- HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:  :- Exchange(distribution=[hash[p1]])
+:  :  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+:  :     +- DynamicFilteringDataCollector(fields=[p])
+:  :        +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+:  :           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+:  +- Exchange(distribution=[hash[p]])
+:     +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+:        +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p2]])
+   +- TableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])
+
+== Optimized Execution Plan ==
+MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(p1 = p2)], select=[a1, b1, c1, p1, x, y, z, p, a2, b2, c2, p2], build=[left])\n:- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])\n:  :- [#2] Exchange(distribution=[hash[p1]])\n:  +- [#3] Exchange(distribution=[hash[p]])\n+- [#1] Exchange(distribution=[hash[p2]])\n])
+:- Exchange(distribution=[hash[p2]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])
+:- Exchange(distribution=[hash[p1]])
+:  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+:     +- DynamicFilteringDataCollector(fields=[p])
+:        +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+:           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+   +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: fact2[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])",
+    "parallelism" : 10
+  }, {
+    "id" : ,
+    "type" : "Source: fact1[]",
+    "pact" : "Data Source",
+    "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+    "parallelism" : 10
+  }, {
+    "id" : ,
+    "type" : "Source: dim[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "DynamicFilteringDataCollector[]",
+    "pact" : "Operator",
+    "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Order-Enforcer",
+    "pact" : "Operator",
+    "contents" : "Order-Enforcer",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "MultipleInput[]",
+    "pact" : "Operator",
+    "contents" : "[]:MultipleInput(readOrder=[2,1,0], members=[\\nHashJoin(joinType=[InnerJoin], where=[(p1 = p2)], select=[a1, b1, c1, p1, x, y, z, p, a2, b2, c2, p2], build=[left])\\n:- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])\\n:  :- [#2] Exchange(distribution=[hash[p1]])\\n:  +- [#3] Exchange(distribution=[hash[p]])\\n+- [#1] Exchange(distribution=[hash[p2]])\\n])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH[p]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[p1]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[p2]",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLegacySource">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], p1=[$4], x=[$5], y=[$6], z=[$7], p=[$8])
++- LogicalFilter(condition=[AND(=($4, $8), >($5, 10))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, legacy_source]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, d1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, legacy_source]], fields=[a1, b1, c1, d1, p1])
++- Exchange(distribution=[hash[p]])
+   +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+      +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Optimized Execution Plan ==
+HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, d1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, legacy_source]], fields=[a1, b1, c1, d1, p1])
++- Exchange(distribution=[hash[p]])
+   +- Calc(select=[x, y, z, p], where=[(x > 10)])
+      +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: legacy_source[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, legacy_source]], fields=[a1, b1, c1, d1, p1])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Source: dim[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashJoin[]",
+    "pact" : "Operator",
+    "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, d1, p1, x, y, z, p], build=[right])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH[p]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[p1]",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSimpleDynamicFiltering">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
++- LogicalFilter(condition=[AND(=($3, $7), >($4, 10))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+:  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+:     +- DynamicFilteringDataCollector(fields=[p])
+:        +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+:           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+   +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+      +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Optimized Execution Plan ==
+HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+:  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+:     +- DynamicFilteringDataCollector(fields=[p])
+:        +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+:           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+   +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: fact1[]",
+    "pact" : "Data Source",
+    "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+    "parallelism" : 10
+  }, {
+    "id" : ,
+    "type" : "Source: dim[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "DynamicFilteringDataCollector[]",
+    "pact" : "Operator",
+    "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Order-Enforcer",
+    "pact" : "Operator",
+    "contents" : "Order-Enforcer",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashJoin[]",
+    "pact" : "Operator",
+    "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH[p]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[p1]",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testReuseDimSide">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+:- LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
+:  +- LogicalFilter(condition=[AND(=($3, $7), >($4, 10))])
+:     +- LogicalJoin(condition=[true], joinType=[inner])
+:        :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
++- LogicalProject(a2=[$0], b2=[$1], c2=[$2], p2=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
+   +- LogicalFilter(condition=[AND(=($3, $7), >($4, 10))])
+      +- LogicalJoin(condition=[true], joinType=[inner])
+         :- LogicalTableScan(table=[[default_catalog, default_database, fact2]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+
+== Optimized Physical Plan ==
+Union(all=[true], union=[a1, b1, c1, p1, x, y, z, p])
+:- HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:  :- Exchange(distribution=[hash[p1]])
+:  :  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+:  :     +- DynamicFilteringDataCollector(fields=[p])
+:  :        +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+:  :           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+:  +- Exchange(distribution=[hash[p]])
+:     +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+:        +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- HashJoin(joinType=[InnerJoin], where=[=(p2, p)], select=[a2, b2, c2, p2, x, y, z, p], build=[right])
+   :- Exchange(distribution=[hash[p2]])
+   :  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])
+   :     +- DynamicFilteringDataCollector(fields=[p])
+   :        +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+   +- Exchange(distribution=[hash[p]])
+      +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+         +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Optimized Execution Plan ==
+MultipleInput(readOrder=[1,0,1,0], members=[\nUnion(all=[true], union=[a1, b1, c1, p1, x, y, z, p])\n:- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])\n:  :- [#1] Exchange(distribution=[hash[p1]])\n:  +- [#2] Exchange(distribution=[hash[p]])\n+- HashJoin(joinType=[InnerJoin], where=[(p2 = p)], select=[a2, b2, c2, p2, x, y, z, p], build=[right])\n   :- [#3] Exchange(distribution=[hash[p2]])\n   +- [#2] Exchange(distribution=[hash[p]])\n])
+:- Exchange(distribution=[hash[p1]])
+:  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+:     +- DynamicFilteringDataCollector(fields=[p])(reuse_id=[2])
+:        +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+:           +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+:- Exchange(distribution=[hash[p]])(reuse_id=[3])
+:  +- Reused(reference_id=[1])
+:- Exchange(distribution=[hash[p2]])
+:  +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])
+:     +- Reused(reference_id=[2])
++- Reused(reference_id=[3])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: fact1[]",
+    "pact" : "Data Source",
+    "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+    "parallelism" : 10
+  }, {
+    "id" : ,
+    "type" : "Source: dim[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "DynamicFilteringDataCollector[]",
+    "pact" : "Operator",
+    "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Order-Enforcer",
+    "pact" : "Operator",
+    "contents" : "Order-Enforcer",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Source: fact2[]",
+    "pact" : "Data Source",
+    "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])",
+    "parallelism" : 10
+  }, {
+    "id" : ,
+    "type" : "Order-Enforcer",
+    "pact" : "Operator",
+    "contents" : "Order-Enforcer",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "MultipleInput[]",
+    "pact" : "Operator",
+    "contents" : "[]:MultipleInput(readOrder=[1,0,1,0], members=[\\nUnion(all=[true], union=[a1, b1, c1, p1, x, y, z, p])\\n:- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])\\n:  :- [#1] Exchange(distribution=[hash[p1]])\\n:  +- [#2] Exchange(distribution=[hash[p]])\\n+- HashJoin(joinType=[InnerJoin], where=[(p2 = p)], select=[a2, b2, c2, p2, x, y, z, p], build=[right])\\n   :- [#3] Exchange(distribution=[hash[p2]])\\n   +- [#2] Excha [...]
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH[p]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[p1]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[p]",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH[p2]",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
index 5552fb5ba70..d2d13adb880 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
@@ -957,6 +957,114 @@ Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, id
       "side" : "second"
     } ]
   } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMatch[isNameSimplifyEnabled=false]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(aid=[$0], bid=[$1], cid=[$2])
++- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Optimized Execution Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+    "pact" : "Data Source",
+    "contents" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+    "pact" : "Operator",
+    "contents" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "GLOBAL",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+    "pact" : "Operator",
+    "contents" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMatch[isNameSimplifyEnabled=true]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(aid=[$0], bid=[$1], cid=[$2])
++- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Optimized Execution Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: MyTable[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "GLOBAL",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Match[]",
+    "pact" : "Operator",
+    "contents" : "[]:Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
 }]]>
     </Resource>
   </TestCase>
@@ -1131,15 +1239,15 @@ NestedLoopJoin(joinType=[InnerJoin], where=[(a = d0)], select=[a, b, c, d, a0, b
 {
   "nodes" : [ {
     "id" : ,
-    "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
+    "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
     "pact" : "Data Source",
-    "contents" : "TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
+    "contents" : "TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
     "parallelism" : 1
   }, {
     "id" : ,
-    "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
+    "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
     "pact" : "Data Source",
-    "contents" : "TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
+    "contents" : "TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
     "parallelism" : 1
   }, {
     "id" : ,
@@ -1185,15 +1293,15 @@ NestedLoopJoin(joinType=[InnerJoin], where=[(a = d0)], select=[a, b, c, d, a0, b
 {
   "nodes" : [ {
     "id" : ,
-    "type" : "Source: B[]",
+    "type" : "Source: A[]",
     "pact" : "Data Source",
-    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
     "parallelism" : 1
   }, {
     "id" : ,
-    "type" : "Source: A[]",
+    "type" : "Source: B[]",
     "pact" : "Data Source",
-    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
     "parallelism" : 1
   }, {
     "id" : ,
@@ -2093,114 +2201,6 @@ Calc(select=[b, w$end AS window_end, EXPR$2])
       "side" : "second"
     } ]
   } ]
-}]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testMatch[isNameSimplifyEnabled=false]">
-    <Resource name="explain">
-      <![CDATA[== Abstract Syntax Tree ==
-LogicalProject(aid=[$0], bid=[$1], cid=[$2])
-+- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-
-== Optimized Physical Plan ==
-Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
-+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
-   +- Exchange(distribution=[single])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
-
-== Optimized Execution Plan ==
-Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
-+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
-   +- Exchange(distribution=[single])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
-
-== Physical Execution Plan ==
-{
-  "nodes" : [ {
-    "id" : ,
-    "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
-    "pact" : "Data Source",
-    "contents" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
-    "parallelism" : 1
-  }, {
-    "id" : ,
-    "type" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
-    "pact" : "Operator",
-    "contents" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
-    "parallelism" : 1,
-    "predecessors" : [ {
-      "id" : ,
-      "ship_strategy" : "GLOBAL",
-      "side" : "second"
-    } ]
-  }, {
-    "id" : ,
-    "type" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
-    "pact" : "Operator",
-    "contents" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
-    "parallelism" : 1,
-    "predecessors" : [ {
-      "id" : ,
-      "ship_strategy" : "FORWARD",
-      "side" : "second"
-    } ]
-  } ]
-}]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testMatch[isNameSimplifyEnabled=true]">
-    <Resource name="explain">
-      <![CDATA[== Abstract Syntax Tree ==
-LogicalProject(aid=[$0], bid=[$1], cid=[$2])
-+- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-
-== Optimized Physical Plan ==
-Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
-+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
-   +- Exchange(distribution=[single])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
-
-== Optimized Execution Plan ==
-Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
-+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
-   +- Exchange(distribution=[single])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
-
-== Physical Execution Plan ==
-{
-  "nodes" : [ {
-    "id" : ,
-    "type" : "Source: MyTable[]",
-    "pact" : "Data Source",
-    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
-    "parallelism" : 1
-  }, {
-    "id" : ,
-    "type" : "Calc[]",
-    "pact" : "Operator",
-    "contents" : "[]:Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
-    "parallelism" : 1,
-    "predecessors" : [ {
-      "id" : ,
-      "ship_strategy" : "GLOBAL",
-      "side" : "second"
-    } ]
-  }, {
-    "id" : ,
-    "type" : "Match[]",
-    "pact" : "Operator",
-    "contents" : "[]:Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
-    "parallelism" : 1,
-    "predecessors" : [ {
-      "id" : ,
-      "ship_strategy" : "FORWARD",
-      "side" : "second"
-    } ]
-  } ]
 }]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
index c95d3c35b27..2ac34b05e05 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
@@ -521,6 +521,22 @@ object TestData {
 
   val nullablesOfDuplicateData5 = Array(true, true, true, true, true)
 
+  lazy val data7 = Seq(
+    row(1, 0, 1L),
+    row(2, 1, 1L),
+    row(2, 2, 2L),
+    row(3, 3, 2L),
+    row(3, 4, 3L),
+    row(4, 5, 2L),
+    row(4, 6, 1L),
+    row(4, 7, 2L),
+    row(5, 8, 1L),
+    row(5, 9, 2L),
+    row(5, 10, 3L),
+    row(6, 11, 2L),
+    row(6, 11, 4L)
+  )
+
   lazy val numericData: Seq[Row] = Seq(
     row(1, 1L, 1.0f, 1.0d, JBigDecimal.valueOf(1)),
     row(2, 2L, 2.0f, 2.0d, JBigDecimal.valueOf(2)),