You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2023/01/25 08:40:21 UTC

[flink] branch master updated: [FLINK-29215][table-planner] Use config based constructors for converter rules instead of deprecated

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ce8cb1f2bd [FLINK-29215][table-planner] Use config based constructors for converter rules instead of deprecated
0ce8cb1f2bd is described below

commit 0ce8cb1f2bd13c7cfca8b777972db0cbaa99af46
Author: Sergey Nuyanzin <sn...@gmail.com>
AuthorDate: Tue Aug 23 11:12:24 2022 +0200

    [FLINK-29215][table-planner] Use config based constructors for converter rules instead of deprecated
    
    This closes #20773.
---
 .../batch/BatchPhysicalPythonAggregateRule.java    | 20 +++++++-----
 .../batch/BatchPhysicalPythonCorrelateRule.java    | 18 +++++++----
 .../physical/common/CommonPhysicalMatchRule.java   |  2 +-
 .../stream/StreamPhysicalPythonCorrelateRule.java  | 18 +++++++----
 .../StreamPhysicalPythonGroupAggregateRule.java    | 18 +++++++----
 ...treamPhysicalPythonGroupTableAggregateRule.java | 18 +++++++----
 ...reamPhysicalPythonGroupWindowAggregateRule.java | 19 ++++++-----
 .../StreamPhysicalPythonOverAggregateRule.java     | 18 +++++++----
 .../plan/nodes/logical/FlinkLogicalAggregate.scala | 37 ++++++++++++++--------
 .../plan/nodes/logical/FlinkLogicalCalc.scala      | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalCorrelate.scala | 17 ++++++----
 .../logical/FlinkLogicalDataStreamTableScan.scala  | 15 +++++----
 .../nodes/logical/FlinkLogicalDistribution.scala   | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalExpand.scala    | 15 +++++----
 .../FlinkLogicalIntermediateTableScan.scala        | 16 +++++-----
 .../plan/nodes/logical/FlinkLogicalIntersect.scala | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalJoin.scala      | 15 +++++----
 .../nodes/logical/FlinkLogicalLegacySink.scala     | 15 +++++----
 .../FlinkLogicalLegacyTableSourceScan.scala        | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalMatch.scala     | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalMinus.scala     | 15 +++++----
 .../nodes/logical/FlinkLogicalOverAggregate.scala  | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalRank.scala      | 15 +++++----
 .../logical/FlinkLogicalScriptTransform.scala      | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalSink.scala      | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalSnapshot.scala  | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalSort.scala      | 29 +++++++++--------
 .../nodes/logical/FlinkLogicalTableAggregate.scala | 15 +++++----
 .../logical/FlinkLogicalTableFunctionScan.scala    | 17 +++++-----
 .../logical/FlinkLogicalTableSourceScan.scala      | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalUnion.scala     | 15 +++++----
 .../plan/nodes/logical/FlinkLogicalValues.scala    | 15 +++++----
 .../logical/FlinkLogicalWatermarkAssigner.scala    | 15 +++++----
 .../logical/FlinkLogicalWindowAggregate.scala      | 15 +++++----
 .../logical/FlinkLogicalWindowTableAggregate.scala | 15 +++++----
 .../batch/BatchPhysicalBoundedStreamScanRule.scala | 15 +++++----
 .../physical/batch/BatchPhysicalCalcRule.scala     | 15 +++++----
 .../batch/BatchPhysicalCorrelateRule.scala         | 15 +++++----
 .../batch/BatchPhysicalDistributionRule.scala      | 16 +++++-----
 .../physical/batch/BatchPhysicalExpandRule.scala   | 15 +++++----
 .../BatchPhysicalIntermediateTableScanRule.scala   | 15 +++++----
 .../batch/BatchPhysicalLegacySinkRule.scala        | 15 +++++----
 .../BatchPhysicalLegacyTableSourceScanRule.scala   | 15 +++++----
 .../physical/batch/BatchPhysicalLimitRule.scala    | 15 +++++----
 .../batch/BatchPhysicalPythonCalcRule.scala        | 15 +++++----
 .../physical/batch/BatchPhysicalRankRule.scala     | 15 +++++----
 .../batch/BatchPhysicalScriptTransformRule.scala   | 15 +++++----
 .../batch/BatchPhysicalSingleRowJoinRule.scala     | 16 ++++++----
 .../physical/batch/BatchPhysicalSinkRule.scala     | 15 +++++----
 .../batch/BatchPhysicalSortLimitRule.scala         | 15 +++++----
 .../physical/batch/BatchPhysicalSortRule.scala     | 15 +++++----
 .../batch/BatchPhysicalTableSourceScanRule.scala   | 15 +++++----
 .../physical/batch/BatchPhysicalUnionRule.scala    | 15 +++++----
 .../physical/batch/BatchPhysicalValuesRule.scala   | 15 +++++----
 .../BatchPhysicalWindowTableFunctionRule.scala     | 15 +++++----
 .../physical/stream/StreamPhysicalCalcRule.scala   | 15 +++++----
 .../stream/StreamPhysicalCorrelateRule.scala       | 15 +++++----
 .../stream/StreamPhysicalDataStreamScanRule.scala  | 15 +++++----
 .../stream/StreamPhysicalDeduplicateRule.scala     | 15 +++++----
 .../physical/stream/StreamPhysicalExpandRule.scala | 15 +++++----
 .../stream/StreamPhysicalGroupAggregateRule.scala  | 15 +++++----
 .../StreamPhysicalGroupTableAggregateRule.scala    | 15 +++++----
 .../StreamPhysicalGroupWindowAggregateRule.scala   | 16 +++++-----
 ...reamPhysicalGroupWindowTableAggregateRule.scala | 15 +++++----
 .../StreamPhysicalIntermediateTableScanRule.scala  | 15 +++++----
 .../stream/StreamPhysicalLegacySinkRule.scala      | 15 +++++----
 .../StreamPhysicalLegacyTableSourceScanRule.scala  | 15 +++++----
 .../physical/stream/StreamPhysicalLimitRule.scala  | 15 +++++----
 .../stream/StreamPhysicalOverAggregateRule.scala   | 15 +++++----
 .../stream/StreamPhysicalPythonCalcRule.scala      | 15 +++++----
 .../physical/stream/StreamPhysicalRankRule.scala   | 18 +++++------
 .../physical/stream/StreamPhysicalSinkRule.scala   | 15 +++++----
 .../stream/StreamPhysicalSortLimitRule.scala       | 15 +++++----
 .../physical/stream/StreamPhysicalSortRule.scala   | 15 +++++----
 .../stream/StreamPhysicalTableSourceScanRule.scala | 15 +++++----
 .../stream/StreamPhysicalTemporalSortRule.scala    | 15 +++++----
 .../physical/stream/StreamPhysicalUnionRule.scala  | 15 +++++----
 .../physical/stream/StreamPhysicalValuesRule.scala | 15 +++++----
 .../StreamPhysicalWatermarkAssignerRule.scala      | 15 +++++----
 .../stream/StreamPhysicalWindowAggregateRule.scala | 15 +++++----
 .../StreamPhysicalWindowDeduplicateRule.scala      | 15 +++++----
 .../stream/StreamPhysicalWindowRankRule.scala      | 15 +++++----
 .../StreamPhysicalWindowTableFunctionRule.scala    | 15 +++++----
 83 files changed, 706 insertions(+), 597 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonAggregateRule.java
index a4d1ca9747a..6552bb69df5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonAggregateRule.java
@@ -55,14 +55,18 @@ import scala.collection.Seq;
  */
 public class BatchPhysicalPythonAggregateRule extends ConverterRule {
 
-    public static final RelOptRule INSTANCE = new BatchPhysicalPythonAggregateRule();
-
-    private BatchPhysicalPythonAggregateRule() {
-        super(
-                FlinkLogicalAggregate.class,
-                FlinkConventions.LOGICAL(),
-                FlinkConventions.BATCH_PHYSICAL(),
-                "BatchPhysicalPythonAggregateRule");
+    public static final RelOptRule INSTANCE =
+            new BatchPhysicalPythonAggregateRule(
+                    Config.INSTANCE
+                            .withConversion(
+                                    FlinkLogicalAggregate.class,
+                                    FlinkConventions.LOGICAL(),
+                                    FlinkConventions.BATCH_PHYSICAL(),
+                                    "BatchPhysicalPythonAggregateRule")
+                            .withRuleFactory(BatchPhysicalPythonAggregateRule::new));
+
+    protected BatchPhysicalPythonAggregateRule(Config config) {
+        super(config);
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
index 0a81fb2aad5..7f3e1c5d9d7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
@@ -42,14 +42,18 @@ import scala.Some;
  */
 public class BatchPhysicalPythonCorrelateRule extends ConverterRule {
 
-    public static final RelOptRule INSTANCE = new BatchPhysicalPythonCorrelateRule();
+    public static final RelOptRule INSTANCE =
+            new BatchPhysicalPythonCorrelateRule(
+                    Config.INSTANCE
+                            .withConversion(
+                                    FlinkLogicalCorrelate.class,
+                                    FlinkConventions.LOGICAL(),
+                                    FlinkConventions.BATCH_PHYSICAL(),
+                                    "BatchPhysicalPythonCorrelateRule")
+                            .withRuleFactory(BatchPhysicalPythonCorrelateRule::new));
 
-    private BatchPhysicalPythonCorrelateRule() {
-        super(
-                FlinkLogicalCorrelate.class,
-                FlinkConventions.LOGICAL(),
-                FlinkConventions.BATCH_PHYSICAL(),
-                "BatchPhysicalPythonCorrelateRule");
+    private BatchPhysicalPythonCorrelateRule(Config config) {
+        super(config);
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
index ad1b15e8ea8..5476fbbbebe 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
@@ -52,7 +52,7 @@ public abstract class CommonPhysicalMatchRule extends ConverterRule {
 
     public CommonPhysicalMatchRule(
             Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix) {
-        super(clazz, in, out, descriptionPrefix);
+        super(Config.INSTANCE.as(Config.class).withConversion(clazz, in, out, descriptionPrefix));
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
index 3fcbe0f3f96..637cc08a594 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
@@ -42,14 +42,18 @@ import scala.Some;
  */
 public class StreamPhysicalPythonCorrelateRule extends ConverterRule {
 
-    public static final RelOptRule INSTANCE = new StreamPhysicalPythonCorrelateRule();
+    public static final RelOptRule INSTANCE =
+            new StreamPhysicalPythonCorrelateRule(
+                    Config.INSTANCE
+                            .withConversion(
+                                    FlinkLogicalCorrelate.class,
+                                    FlinkConventions.LOGICAL(),
+                                    FlinkConventions.STREAM_PHYSICAL(),
+                                    "StreamPhysicalPythonCorrelateRule")
+                            .withRuleFactory(StreamPhysicalPythonCorrelateRule::new));
 
-    private StreamPhysicalPythonCorrelateRule() {
-        super(
-                FlinkLogicalCorrelate.class,
-                FlinkConventions.LOGICAL(),
-                FlinkConventions.STREAM_PHYSICAL(),
-                "StreamPhysicalPythonCorrelateRule");
+    private StreamPhysicalPythonCorrelateRule(Config config) {
+        super(config);
     }
 
     // find only calc and table function
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
index 602bdefeb42..acae431f2a7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
@@ -43,14 +43,18 @@ import java.util.List;
  */
 public class StreamPhysicalPythonGroupAggregateRule extends ConverterRule {
 
-    public static final RelOptRule INSTANCE = new StreamPhysicalPythonGroupAggregateRule();
+    public static final RelOptRule INSTANCE =
+            new StreamPhysicalPythonGroupAggregateRule(
+                    Config.INSTANCE
+                            .withConversion(
+                                    FlinkLogicalAggregate.class,
+                                    FlinkConventions.LOGICAL(),
+                                    FlinkConventions.STREAM_PHYSICAL(),
+                                    "StreamPhysicalPythonGroupAggregateRule")
+                            .withRuleFactory(StreamPhysicalPythonGroupAggregateRule::new));
 
-    public StreamPhysicalPythonGroupAggregateRule() {
-        super(
-                FlinkLogicalAggregate.class,
-                FlinkConventions.LOGICAL(),
-                FlinkConventions.STREAM_PHYSICAL(),
-                "StreamPhysicalPythonGroupAggregateRule");
+    public StreamPhysicalPythonGroupAggregateRule(Config config) {
+        super(config);
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java
index 81bd14658ce..cbe6fbcc7de 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java
@@ -42,14 +42,18 @@ import java.util.List;
  */
 public class StreamPhysicalPythonGroupTableAggregateRule extends ConverterRule {
 
-    public static final RelOptRule INSTANCE = new StreamPhysicalPythonGroupTableAggregateRule();
+    public static final RelOptRule INSTANCE =
+            new StreamPhysicalPythonGroupTableAggregateRule(
+                    Config.INSTANCE
+                            .withConversion(
+                                    FlinkLogicalTableAggregate.class,
+                                    FlinkConventions.LOGICAL(),
+                                    FlinkConventions.STREAM_PHYSICAL(),
+                                    "StreamPhysicalPythonGroupTableAggregateRule")
+                            .withRuleFactory(StreamPhysicalPythonGroupTableAggregateRule::new));
 
-    public StreamPhysicalPythonGroupTableAggregateRule() {
-        super(
-                FlinkLogicalTableAggregate.class,
-                FlinkConventions.LOGICAL(),
-                FlinkConventions.STREAM_PHYSICAL(),
-                "StreamPhysicalPythonGroupTableAggregateRule");
+    public StreamPhysicalPythonGroupTableAggregateRule(Config config) {
+        super(config);
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
index 5f080d76da7..cdb2f54d3cb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
@@ -50,14 +50,17 @@ import java.util.List;
 public class StreamPhysicalPythonGroupWindowAggregateRule extends ConverterRule {
 
     public static final StreamPhysicalPythonGroupWindowAggregateRule INSTANCE =
-            new StreamPhysicalPythonGroupWindowAggregateRule();
-
-    private StreamPhysicalPythonGroupWindowAggregateRule() {
-        super(
-                FlinkLogicalWindowAggregate.class,
-                FlinkConventions.LOGICAL(),
-                FlinkConventions.STREAM_PHYSICAL(),
-                "StreamPhysicalPythonGroupWindowAggregateRule");
+            new StreamPhysicalPythonGroupWindowAggregateRule(
+                    Config.INSTANCE
+                            .withConversion(
+                                    FlinkLogicalWindowAggregate.class,
+                                    FlinkConventions.LOGICAL(),
+                                    FlinkConventions.STREAM_PHYSICAL(),
+                                    "StreamPhysicalPythonGroupWindowAggregateRule")
+                            .withRuleFactory(StreamPhysicalPythonGroupWindowAggregateRule::new));
+
+    private StreamPhysicalPythonGroupWindowAggregateRule(Config config) {
+        super(config);
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonOverAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonOverAggregateRule.java
index 86da96b9caa..934daf42e1c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonOverAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonOverAggregateRule.java
@@ -41,15 +41,19 @@ import java.util.List;
  * StreamPhysicalPythonOverAggregate}.
  */
 public class StreamPhysicalPythonOverAggregateRule extends ConverterRule {
+
     public static final StreamPhysicalPythonOverAggregateRule INSTANCE =
-            new StreamPhysicalPythonOverAggregateRule();
+            new StreamPhysicalPythonOverAggregateRule(
+                    Config.INSTANCE
+                            .withConversion(
+                                    FlinkLogicalOverAggregate.class,
+                                    FlinkConventions.LOGICAL(),
+                                    FlinkConventions.STREAM_PHYSICAL(),
+                                    "StreamPhysicalPythonOverAggregateRule")
+                            .withRuleFactory(StreamPhysicalPythonOverAggregateRule::new));
 
-    private StreamPhysicalPythonOverAggregateRule() {
-        super(
-                FlinkLogicalOverAggregate.class,
-                FlinkConventions.LOGICAL(),
-                FlinkConventions.STREAM_PHYSICAL(),
-                "StreamPhysicalPythonOverAggregateRule");
+    private StreamPhysicalPythonOverAggregateRule(Config config) {
+        super(config);
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
index 27d03f9470d..0c9c1f2ca03 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -94,12 +94,8 @@ class FlinkLogicalAggregate(
   }
 }
 
-private class FlinkLogicalAggregateBatchConverter
-  extends ConverterRule(
-    classOf[LogicalAggregate],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalAggregateBatchConverter") {
+private class FlinkLogicalAggregateBatchConverter(config: ConverterRule.Config)
+  extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg = call.rel(0).asInstanceOf[LogicalAggregate]
@@ -126,12 +122,8 @@ private class FlinkLogicalAggregateBatchConverter
   }
 }
 
-private class FlinkLogicalAggregateStreamConverter
-  extends ConverterRule(
-    classOf[LogicalAggregate],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalAggregateStreamConverter") {
+private class FlinkLogicalAggregateStreamConverter(config: ConverterRule.Config)
+  extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg = call.rel(0).asInstanceOf[LogicalAggregate]
@@ -152,8 +144,25 @@ private class FlinkLogicalAggregateStreamConverter
 }
 
 object FlinkLogicalAggregate {
-  val BATCH_CONVERTER: ConverterRule = new FlinkLogicalAggregateBatchConverter()
-  val STREAM_CONVERTER: ConverterRule = new FlinkLogicalAggregateStreamConverter()
+
+  val BATCH_CONVERTER: ConverterRule = new FlinkLogicalAggregateBatchConverter(
+    ConverterRule.Config.INSTANCE
+      .withConversion(
+        classOf[LogicalAggregate],
+        Convention.NONE,
+        FlinkConventions.LOGICAL,
+        "FlinkLogicalAggregateBatchConverter")
+      .withRuleFactory(
+        (config: ConverterRule.Config) => new FlinkLogicalAggregateBatchConverter(config)))
+  val STREAM_CONVERTER: ConverterRule = new FlinkLogicalAggregateStreamConverter(
+    ConverterRule.Config.INSTANCE
+      .withConversion(
+        classOf[LogicalAggregate],
+        Convention.NONE,
+        FlinkConventions.LOGICAL,
+        "FlinkLogicalAggregateStreamConverter")
+      .withRuleFactory(
+        (config: ConverterRule.Config) => new FlinkLogicalAggregateStreamConverter(config)))
 
   def create(
       input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCalc.scala
index 95a4489cd69..eba39f46de6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCalc.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCalc.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.logical.LogicalCalc
 import org.apache.calcite.rel.metadata.RelMdCollation
@@ -49,12 +50,7 @@ class FlinkLogicalCalc(
 
 }
 
-private class FlinkLogicalCalcConverter
-  extends ConverterRule(
-    classOf[LogicalCalc],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalCalcConverter") {
+private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val calc = rel.asInstanceOf[LogicalCalc]
@@ -64,7 +60,12 @@ private class FlinkLogicalCalcConverter
 }
 
 object FlinkLogicalCalc {
-  val CONVERTER: ConverterRule = new FlinkLogicalCalcConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalCalcConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalCalc],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalCalcConverter"))
 
   def create(input: RelNode, calcProgram: RexProgram): FlinkLogicalCalc = {
     val cluster = input.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCorrelate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCorrelate.scala
index 8378c9c70aa..aba0e4051e8 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCorrelate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCorrelate.scala
@@ -61,12 +61,7 @@ class FlinkLogicalCorrelate(
 
 }
 
-class FlinkLogicalCorrelateConverter
-  extends ConverterRule(
-    classOf[LogicalCorrelate],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalCorrelateConverter") {
+class FlinkLogicalCorrelateConverter(config: ConverterRule.Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val correlate = rel.asInstanceOf[LogicalCorrelate]
@@ -82,7 +77,15 @@ class FlinkLogicalCorrelateConverter
 }
 
 object FlinkLogicalCorrelate {
-  val CONVERTER: ConverterRule = new FlinkLogicalCorrelateConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalCorrelateConverter(
+    ConverterRule.Config.INSTANCE
+      .withConversion(
+        classOf[LogicalCorrelate],
+        Convention.NONE,
+        FlinkConventions.LOGICAL,
+        "FlinkLogicalCorrelateConverter")
+      .withRuleFactory(
+        (config: ConverterRule.Config) => new FlinkLogicalCorrelateConverter(config)))
 
   def create(
       left: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
index 4759ab5c8ab..76738aa2bc9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalTableScan
@@ -66,12 +67,7 @@ class FlinkLogicalDataStreamTableScan(
 
 }
 
-class FlinkLogicalDataStreamTableScanConverter
-  extends ConverterRule(
-    classOf[LogicalTableScan],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalDataStreamTableScanConverter") {
+class FlinkLogicalDataStreamTableScanConverter(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0)
@@ -86,7 +82,12 @@ class FlinkLogicalDataStreamTableScanConverter
 }
 
 object FlinkLogicalDataStreamTableScan {
-  val CONVERTER = new FlinkLogicalDataStreamTableScanConverter
+  val CONVERTER = new FlinkLogicalDataStreamTableScanConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalTableScan],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalDataStreamTableScanConverter"))
 
   def isDataStreamTableScan(scan: TableScan): Boolean = {
     val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDistribution.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDistribution.scala
index 7c2b632e410..a537ce32f64 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDistribution.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDistribution.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, SingleRel}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import java.util.{List => JList}
 
@@ -41,12 +42,7 @@ class FlinkLogicalDistribution(
     new FlinkLogicalDistribution(getCluster, traitSet, inputs.get(0), collation, distKeys)
 }
 
-class FlinkLogicalDistributionBatchConverter
-  extends ConverterRule(
-    classOf[LogicalDistribution],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalDistributionBatchConverter") {
+class FlinkLogicalDistributionBatchConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val distribution = rel.asInstanceOf[LogicalDistribution]
@@ -56,7 +52,12 @@ class FlinkLogicalDistributionBatchConverter
 }
 
 object FlinkLogicalDistribution {
-  val BATCH_CONVERTER: RelOptRule = new FlinkLogicalDistributionBatchConverter
+  val BATCH_CONVERTER: RelOptRule = new FlinkLogicalDistributionBatchConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalDistribution],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalDistributionBatchConverter"))
 
   def create(
       input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalExpand.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalExpand.scala
index c9398d1f186..4cc0de30e7a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalExpand.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalExpand.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, LogicalExpand}
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rex.RexNode
 
 import java.util
@@ -46,12 +47,7 @@ class FlinkLogicalExpand(
 
 }
 
-private class FlinkLogicalExpandConverter
-  extends ConverterRule(
-    classOf[LogicalExpand],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalExpandConverter") {
+private class FlinkLogicalExpandConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val expand = rel.asInstanceOf[LogicalExpand]
@@ -61,7 +57,12 @@ private class FlinkLogicalExpandConverter
 }
 
 object FlinkLogicalExpand {
-  val CONVERTER: ConverterRule = new FlinkLogicalExpandConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalExpandConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalExpand],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalExpandConverter"))
 
   def create(
       input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntermediateTableScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntermediateTableScan.scala
index bf7594668ad..8ffd6c33577 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntermediateTableScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntermediateTableScan.scala
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.LogicalTableScan
 
@@ -45,12 +46,7 @@ class FlinkLogicalIntermediateTableScan(
 
 }
 
-class FlinkLogicalIntermediateTableScanConverter
-  extends ConverterRule(
-    classOf[LogicalTableScan],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalIntermediateTableScanConverter") {
+class FlinkLogicalIntermediateTableScanConverter(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0)
@@ -65,8 +61,12 @@ class FlinkLogicalIntermediateTableScanConverter
 }
 
 object FlinkLogicalIntermediateTableScan {
-
-  val CONVERTER = new FlinkLogicalIntermediateTableScanConverter
+  val CONVERTER: ConverterRule = new FlinkLogicalIntermediateTableScanConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalTableScan],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalIntermediateTableScanConverter"))
 
   def create(
       cluster: RelOptCluster,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntersect.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntersect.scala
index 78468c8df35..ecb201258ca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntersect.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntersect.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.{Intersect, SetOp}
 import org.apache.calcite.rel.logical.LogicalIntersect
 import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -59,12 +60,7 @@ class FlinkLogicalIntersect(
 
 }
 
-private class FlinkLogicalIntersectConverter
-  extends ConverterRule(
-    classOf[LogicalIntersect],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalIntersectConverter") {
+private class FlinkLogicalIntersectConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val intersect = rel.asInstanceOf[LogicalIntersect]
@@ -76,7 +72,12 @@ private class FlinkLogicalIntersectConverter
 }
 
 object FlinkLogicalIntersect {
-  val CONVERTER: ConverterRule = new FlinkLogicalIntersectConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalIntersectConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalIntersect],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalIntersectConverter"))
 
   def create(inputs: util.List[RelNode], all: Boolean): FlinkLogicalIntersect = {
     val cluster = inputs.get(0).getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
index 8f1d42bd3c7..32dd6a7fbfd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.{CorrelationId, Join, JoinRelType}
 import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalJoin
@@ -78,12 +79,7 @@ class FlinkLogicalJoin(
 }
 
 /** Support all joins. */
-private class FlinkLogicalJoinConverter
-  extends ConverterRule(
-    classOf[LogicalJoin],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalJoinConverter") {
+private class FlinkLogicalJoinConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val join = rel.asInstanceOf[LogicalJoin]
@@ -94,7 +90,12 @@ private class FlinkLogicalJoinConverter
 }
 
 object FlinkLogicalJoin {
-  val CONVERTER: ConverterRule = new FlinkLogicalJoinConverter
+  val CONVERTER: ConverterRule = new FlinkLogicalJoinConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalJoin],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalJoinConverter"))
 
   def create(
       left: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala
index f825e84e861..00c79162e15 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.sinks.TableSink
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.hint.RelHint
 
 import java.util
@@ -61,12 +62,7 @@ class FlinkLogicalLegacySink(
 
 }
 
-private class FlinkLogicalLegacySinkConverter
-  extends ConverterRule(
-    classOf[LogicalLegacySink],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalLegacySinkConverter") {
+private class FlinkLogicalLegacySinkConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val sink = rel.asInstanceOf[LogicalLegacySink]
@@ -82,7 +78,12 @@ private class FlinkLogicalLegacySinkConverter
 }
 
 object FlinkLogicalLegacySink {
-  val CONVERTER: ConverterRule = new FlinkLogicalLegacySinkConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalLegacySinkConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalLegacySink],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalLegacySinkConverter"))
 
   def create(
       input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala
index 5f6abfea564..328e89bc509 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalTableScan
@@ -83,12 +84,7 @@ class FlinkLogicalLegacyTableSourceScan(
 
 }
 
-class FlinkLogicalLegacyTableSourceScanConverter
-  extends ConverterRule(
-    classOf[LogicalTableScan],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalLegacyTableSourceScanConverter") {
+class FlinkLogicalLegacyTableSourceScanConverter(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0)
@@ -103,7 +99,12 @@ class FlinkLogicalLegacyTableSourceScanConverter
 }
 
 object FlinkLogicalLegacyTableSourceScan {
-  val CONVERTER = new FlinkLogicalLegacyTableSourceScanConverter
+  val CONVERTER = new FlinkLogicalLegacyTableSourceScanConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalTableScan],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalLegacyTableSourceScanConverter"))
 
   def isTableSourceScan(scan: TableScan): Boolean = {
     val tableSourceTable = scan.getTable.unwrap(classOf[LegacyTableSourceTable[_]])
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala
index d02f7cb3e0d..993abb5956f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelCollation, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.Match
 import org.apache.calcite.rel.logical.LogicalMatch
 import org.apache.calcite.rex.RexNode
@@ -109,12 +110,7 @@ class FlinkLogicalMatch(
   }
 }
 
-private class FlinkLogicalMatchConverter
-  extends ConverterRule(
-    classOf[LogicalMatch],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalMatchConverter") {
+private class FlinkLogicalMatchConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val logicalMatch = rel.asInstanceOf[LogicalMatch]
@@ -141,5 +137,10 @@ private class FlinkLogicalMatchConverter
 }
 
 object FlinkLogicalMatch {
-  val CONVERTER: ConverterRule = new FlinkLogicalMatchConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalMatchConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalMatch],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalMatchConverter"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMinus.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMinus.scala
index 330d19f5c7a..1f89eeaf2d6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMinus.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMinus.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.{Minus, SetOp}
 import org.apache.calcite.rel.logical.LogicalMinus
 import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -59,12 +60,7 @@ class FlinkLogicalMinus(
 
 }
 
-private class FlinkLogicalMinusConverter
-  extends ConverterRule(
-    classOf[LogicalMinus],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalMinusConverter") {
+private class FlinkLogicalMinusConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val minus = rel.asInstanceOf[LogicalMinus]
@@ -76,7 +72,12 @@ private class FlinkLogicalMinusConverter
 }
 
 object FlinkLogicalMinus {
-  val CONVERTER: ConverterRule = new FlinkLogicalMinusConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalMinusConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalMinus],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalMinusConverter"))
 
   def create(inputs: JList[RelNode], all: Boolean): FlinkLogicalMinus = {
     val cluster = inputs.get(0).getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
index 2395ba1426d..0c9891f0f17 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.Window
 import org.apache.calcite.rel.logical.LogicalWindow
 import org.apache.calcite.rel.metadata.RelMdCollation
@@ -62,12 +63,7 @@ class FlinkLogicalOverAggregate(
 
 }
 
-class FlinkLogicalOverAggregateConverter
-  extends ConverterRule(
-    classOf[LogicalWindow],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalOverAggregateConverter") {
+class FlinkLogicalOverAggregateConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val window = rel.asInstanceOf[LogicalWindow]
@@ -109,5 +105,10 @@ class FlinkLogicalOverAggregateConverter
 }
 
 object FlinkLogicalOverAggregate {
-  val CONVERTER = new FlinkLogicalOverAggregateConverter
+  val CONVERTER: ConverterRule = new FlinkLogicalOverAggregateConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalWindow],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalOverAggregateConverter"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalRank.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalRank.scala
index cde32de2c25..c7de2f3a24c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalRank.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalRank.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataTypeField
 import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
@@ -83,12 +84,7 @@ class FlinkLogicalRank(
 
 }
 
-private class FlinkLogicalRankConverter
-  extends ConverterRule(
-    classOf[LogicalRank],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalRankConverter") {
+private class FlinkLogicalRankConverter(config: Config) extends ConverterRule(config) {
   override def convert(rel: RelNode): RelNode = {
     val rank = rel.asInstanceOf[LogicalRank]
     val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)
@@ -105,7 +101,12 @@ private class FlinkLogicalRankConverter
 }
 
 object FlinkLogicalRank {
-  val CONVERTER: ConverterRule = new FlinkLogicalRankConverter
+  val CONVERTER: ConverterRule = new FlinkLogicalRankConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalRank],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalRankConverter"))
 
   def create(
       input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalScriptTransform.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalScriptTransform.scala
index 7a376564bd8..4d98e1086a9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalScriptTransform.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalScriptTransform.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitS
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, SingleRel}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import java.util.{List => JList}
 
@@ -53,12 +54,7 @@ class FlinkLogicalScriptTransform(
   override protected def deriveRowType: RelDataType = outDataType
 }
 
-class FlinkLogicalScriptTransformBatchConverter
-  extends ConverterRule(
-    classOf[LogicalScriptTransform],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalScriptTransformBatchConverter") {
+class FlinkLogicalScriptTransformBatchConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val scriptTransform = rel.asInstanceOf[LogicalScriptTransform]
@@ -74,7 +70,12 @@ class FlinkLogicalScriptTransformBatchConverter
 }
 
 object FlinkLogicalScriptTransform {
-  val BATCH_CONVERTER: RelOptRule = new FlinkLogicalScriptTransformBatchConverter
+  val BATCH_CONVERTER: RelOptRule = new FlinkLogicalScriptTransformBatchConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalScriptTransform],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalScriptTransformBatchConverter"))
 
   def create(
       input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
index be1ff8a3c5e..d739ac6c664 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalSink, Sink}
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.hint.RelHint
 
 import java.util
@@ -62,12 +63,7 @@ class FlinkLogicalSink(
 
 }
 
-private class FlinkLogicalSinkConverter
-  extends ConverterRule(
-    classOf[LogicalSink],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalSinkConverter") {
+private class FlinkLogicalSinkConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val sink = rel.asInstanceOf[LogicalSink]
@@ -83,7 +79,12 @@ private class FlinkLogicalSinkConverter
 }
 
 object FlinkLogicalSink {
-  val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalSink],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalSinkConverter"))
 
   def create(
       input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSnapshot.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSnapshot.scala
index 5904dffa413..bea50d3755b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSnapshot.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSnapshot.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.Snapshot
 import org.apache.calcite.rel.logical.LogicalSnapshot
 import org.apache.calcite.rel.metadata.{RelMdCollation, RelMetadataQuery}
@@ -79,12 +80,7 @@ class FlinkLogicalSnapshot(
 
 }
 
-class FlinkLogicalSnapshotConverter
-  extends ConverterRule(
-    classOf[LogicalSnapshot],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalSnapshotConverter") {
+class FlinkLogicalSnapshotConverter(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val snapshot = rel.asInstanceOf[LogicalSnapshot]
@@ -95,7 +91,12 @@ class FlinkLogicalSnapshotConverter
 
 object FlinkLogicalSnapshot {
 
-  val CONVERTER = new FlinkLogicalSnapshotConverter
+  val CONVERTER: ConverterRule = new FlinkLogicalSnapshotConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalSnapshot],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalSnapshotConverter"))
 
   def create(input: RelNode, period: RexNode): FlinkLogicalSnapshot = {
     val cluster = input.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSort.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSort.scala
index df51b5b11ea..f0b8ed02bc5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSort.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSort.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.Sort
 import org.apache.calcite.rel.logical.LogicalSort
 import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -80,12 +81,7 @@ class FlinkLogicalSort(
 
 }
 
-class FlinkLogicalSortStreamConverter
-  extends ConverterRule(
-    classOf[LogicalSort],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalSortStreamConverter") {
+class FlinkLogicalSortStreamConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val sort = rel.asInstanceOf[LogicalSort]
@@ -94,12 +90,7 @@ class FlinkLogicalSortStreamConverter
   }
 }
 
-class FlinkLogicalSortBatchConverter
-  extends ConverterRule(
-    classOf[LogicalSort],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalSortBatchConverter") {
+class FlinkLogicalSortBatchConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val sort = rel.asInstanceOf[LogicalSort]
@@ -126,8 +117,18 @@ class FlinkLogicalSortBatchConverter
 }
 
 object FlinkLogicalSort {
-  val BATCH_CONVERTER: RelOptRule = new FlinkLogicalSortBatchConverter
-  val STREAM_CONVERTER: RelOptRule = new FlinkLogicalSortStreamConverter
+  val BATCH_CONVERTER: RelOptRule = new FlinkLogicalSortBatchConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalSort],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalSortBatchConverter"))
+  val STREAM_CONVERTER: RelOptRule = new FlinkLogicalSortStreamConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalSort],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalSortStreamConverter"))
 
   def create(
       input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableAggregate.scala
index f9d21a637c9..c0074cc9e0c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableAggregate.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate,
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.util.ImmutableBitSet
 
@@ -55,12 +56,7 @@ class FlinkLogicalTableAggregate(
   }
 }
 
-private class FlinkLogicalTableAggregateConverter
-  extends ConverterRule(
-    classOf[LogicalTableAggregate],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalTableAggregateConverter") {
+private class FlinkLogicalTableAggregateConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val agg = rel.asInstanceOf[LogicalTableAggregate]
@@ -78,5 +74,10 @@ private class FlinkLogicalTableAggregateConverter
 }
 
 object FlinkLogicalTableAggregate {
-  val CONVERTER: ConverterRule = new FlinkLogicalTableAggregateConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalTableAggregateConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalTableAggregate],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalTableAggregateConverter"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index ff067e301e1..c402ab6c770 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -22,10 +22,11 @@ import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.utils.TableSqlFunction
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 
-import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.TableFunctionScan
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan
 import org.apache.calcite.rel.metadata.RelColumnMapping
@@ -78,12 +79,7 @@ class FlinkLogicalTableFunctionScan(
 
 }
 
-class FlinkLogicalTableFunctionScanConverter
-  extends ConverterRule(
-    classOf[LogicalTableFunctionScan],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalTableFunctionScanConverter") {
+class FlinkLogicalTableFunctionScanConverter(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val logicalTableFunction: LogicalTableFunctionScan = call.rel(0)
@@ -125,5 +121,10 @@ class FlinkLogicalTableFunctionScanConverter
 }
 
 object FlinkLogicalTableFunctionScan {
-  val CONVERTER = new FlinkLogicalTableFunctionScanConverter
+  val CONVERTER = new FlinkLogicalTableFunctionScanConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalTableFunctionScan],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalTableFunctionScanConverter"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index ba4f16dee43..fb176c99eca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalTableScan
@@ -107,12 +108,7 @@ class FlinkLogicalTableSourceScan(
 
 }
 
-class FlinkLogicalTableSourceScanConverter
-  extends ConverterRule(
-    classOf[LogicalTableScan],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalTableSourceScanConverter") {
+class FlinkLogicalTableSourceScanConverter(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0)
@@ -127,7 +123,12 @@ class FlinkLogicalTableSourceScanConverter
 }
 
 object FlinkLogicalTableSourceScan {
-  val CONVERTER = new FlinkLogicalTableSourceScanConverter
+  val CONVERTER = new FlinkLogicalTableSourceScanConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalTableScan],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalTableSourceScanConverter"))
 
   def isTableSourceScan(scan: TableScan): Boolean = {
     val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalUnion.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalUnion.scala
index c7805e6bf2b..1bf479a7f65 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalUnion.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalUnion.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.{SetOp, Union}
 import org.apache.calcite.rel.logical.LogicalUnion
 import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -57,12 +58,7 @@ class FlinkLogicalUnion(
 
 }
 
-private class FlinkLogicalUnionConverter
-  extends ConverterRule(
-    classOf[LogicalUnion],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalUnionConverter") {
+private class FlinkLogicalUnionConverter(config: Config) extends ConverterRule(config) {
 
   /** Only translate UNION ALL. */
   override def matches(call: RelOptRuleCall): Boolean = {
@@ -80,7 +76,12 @@ private class FlinkLogicalUnionConverter
 }
 
 object FlinkLogicalUnion {
-  val CONVERTER: ConverterRule = new FlinkLogicalUnionConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalUnionConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalUnion],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalUnionConverter"))
 
   def create(inputs: JList[RelNode], all: Boolean): FlinkLogicalUnion = {
     val cluster = inputs.get(0).getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalValues.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalValues.scala
index ddd04d77b4c..b7539dc0b84 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalValues.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalValues.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rel.logical.LogicalValues
 import org.apache.calcite.rel.metadata.{RelMdCollation, RelMetadataQuery}
@@ -58,12 +59,7 @@ class FlinkLogicalValues(
 
 }
 
-private class FlinkLogicalValuesConverter
-  extends ConverterRule(
-    classOf[LogicalValues],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalValuesConverter") {
+private class FlinkLogicalValuesConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val values = rel.asInstanceOf[LogicalValues]
@@ -76,7 +72,12 @@ private class FlinkLogicalValuesConverter
 }
 
 object FlinkLogicalValues {
-  val CONVERTER: ConverterRule = new FlinkLogicalValuesConverter()
+  val CONVERTER: ConverterRule = new FlinkLogicalValuesConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalValues],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalValuesConverter"))
 
   def create(
       cluster: RelOptCluster,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
index 9885baabb38..795304a9f38 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalWatermarkAssign
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rex.RexNode
 
 /**
@@ -49,12 +50,7 @@ class FlinkLogicalWatermarkAssigner(
 
 }
 
-class FlinkLogicalWatermarkAssignerConverter
-  extends ConverterRule(
-    classOf[LogicalWatermarkAssigner],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalWatermarkAssignerConverter") {
+class FlinkLogicalWatermarkAssignerConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val watermark = rel.asInstanceOf[LogicalWatermarkAssigner]
@@ -67,7 +63,12 @@ class FlinkLogicalWatermarkAssignerConverter
 }
 
 object FlinkLogicalWatermarkAssigner {
-  val CONVERTER = new FlinkLogicalWatermarkAssignerConverter
+  val CONVERTER = new FlinkLogicalWatermarkAssignerConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalWatermarkAssigner],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalWatermarkAssignerConverter"))
 
   def create(
       input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index b45b4e86cad..c483d660f0a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rel.core.Aggregate.Group
 import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -74,12 +75,7 @@ class FlinkLogicalWindowAggregate(
 
 }
 
-class FlinkLogicalWindowAggregateConverter
-  extends ConverterRule(
-    classOf[LogicalWindowAggregate],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalWindowAggregateConverter") {
+class FlinkLogicalWindowAggregateConverter(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
@@ -114,5 +110,10 @@ class FlinkLogicalWindowAggregateConverter
 }
 
 object FlinkLogicalWindowAggregate {
-  val CONVERTER = new FlinkLogicalWindowAggregateConverter
+  val CONVERTER: ConverterRule = new FlinkLogicalWindowAggregateConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalWindowAggregate],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalWindowAggregateConverter"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
index 3d9c1b2d4db..a57e65bd8c6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.util.ImmutableBitSet
@@ -78,12 +79,7 @@ class FlinkLogicalWindowTableAggregate(
   }
 }
 
-class FlinkLogicalWindowTableAggregateConverter
-  extends ConverterRule(
-    classOf[LogicalWindowTableAggregate],
-    Convention.NONE,
-    FlinkConventions.LOGICAL,
-    "FlinkLogicalWindowTableAggregateConverter") {
+class FlinkLogicalWindowTableAggregateConverter(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val agg = rel.asInstanceOf[LogicalWindowTableAggregate]
@@ -103,5 +99,10 @@ class FlinkLogicalWindowTableAggregateConverter
 }
 
 object FlinkLogicalWindowTableAggregate {
-  val CONVERTER = new FlinkLogicalWindowTableAggregateConverter
+  val CONVERTER = new FlinkLogicalWindowTableAggregateConverter(
+    Config.INSTANCE.withConversion(
+      classOf[LogicalWindowTableAggregate],
+      Convention.NONE,
+      FlinkConventions.LOGICAL,
+      "FlinkLogicalWindowTableAggregateConverter"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala
index c54a340deb9..4ce90a215f9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala
@@ -25,14 +25,10 @@ import org.apache.flink.table.planner.plan.schema.DataStreamTable
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule that converts [[FlinkLogicalDataStreamTableScan]] to [[BatchPhysicalBoundedStreamScan]]. */
-class BatchPhysicalBoundedStreamScanRule
-  extends ConverterRule(
-    classOf[FlinkLogicalDataStreamTableScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalBoundedStreamScanRule") {
+class BatchPhysicalBoundedStreamScanRule(config: Config) extends ConverterRule(config) {
 
   /** If the input is not a DataStreamTable, we want the TableScanRule to match instead */
   override def matches(call: RelOptRuleCall): Boolean = {
@@ -54,5 +50,10 @@ class BatchPhysicalBoundedStreamScanRule
 }
 
 object BatchPhysicalBoundedStreamScanRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalBoundedStreamScanRule
+  val INSTANCE: RelOptRule = new BatchPhysicalBoundedStreamScanRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalDataStreamTableScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalBoundedStreamScanRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCalcRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCalcRule.scala
index 3751cf8cb09..5c6ed2f5603 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCalcRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCalcRule.scala
@@ -25,16 +25,12 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConverters._
 
 /** Rule that converts [[FlinkLogicalCalc]] to [[BatchPhysicalCalc]]. */
-class BatchPhysicalCalcRule
-  extends ConverterRule(
-    classOf[FlinkLogicalCalc],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalCalcRule") {
+class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val calc: FlinkLogicalCalc = call.rel(0)
@@ -52,5 +48,10 @@ class BatchPhysicalCalcRule
 }
 
 object BatchPhysicalCalcRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalCalcRule
+  val INSTANCE: RelOptRule = new BatchPhysicalCalcRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalCalc],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalCalcRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
index 638a5785d6c..9f714f2057c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
@@ -26,14 +26,10 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.plan.volcano.RelSubset
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rex.RexNode
 
-class BatchPhysicalCorrelateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalCorrelate],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalCorrelateRule") {
+class BatchPhysicalCorrelateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val join = call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
@@ -86,5 +82,10 @@ class BatchPhysicalCorrelateRule
 }
 
 object BatchPhysicalCorrelateRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalCorrelateRule
+  val INSTANCE: RelOptRule = new BatchPhysicalCorrelateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalCorrelate],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalCorrelateRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalDistributionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalDistributionRule.scala
index 8782b7bf306..7ce64932ef0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalDistributionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalDistributionRule.scala
@@ -19,21 +19,16 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
 
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalDistribution
 import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort
 
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule that matches [[FlinkLogicalDistribution]]. */
-class BatchPhysicalDistributionRule
-  extends ConverterRule(
-    classOf[FlinkLogicalDistribution],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchExecDistributionRule") {
+class BatchPhysicalDistributionRule(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val logicalDistribution = rel.asInstanceOf[FlinkLogicalDistribution]
@@ -61,5 +56,10 @@ class BatchPhysicalDistributionRule
 }
 
 object BatchPhysicalDistributionRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalDistributionRule
+  val INSTANCE: RelOptRule = new BatchPhysicalDistributionRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalDistribution],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchExecDistributionRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalExpandRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalExpandRule.scala
index d131dd95bde..42a7b05c844 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalExpandRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalExpandRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExp
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule that converts [[FlinkLogicalExpand]] to [[BatchPhysicalExpand]]. */
-class BatchPhysicalExpandRule
-  extends ConverterRule(
-    classOf[FlinkLogicalExpand],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalExpandRule") {
+class BatchPhysicalExpandRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val expand = rel.asInstanceOf[FlinkLogicalExpand]
@@ -47,5 +43,10 @@ class BatchPhysicalExpandRule
 }
 
 object BatchPhysicalExpandRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalExpandRule
+  val INSTANCE: RelOptRule = new BatchPhysicalExpandRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalExpand],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalExpandRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalIntermediateTableScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalIntermediateTableScanRule.scala
index 1f84547d4fd..293283085bd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalIntermediateTableScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalIntermediateTableScanRule.scala
@@ -24,17 +24,13 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalInt
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /**
  * Rule that converts [[FlinkLogicalIntermediateTableScan]] to
  * [[BatchPhysicalIntermediateTableScan]].
  */
-class BatchPhysicalIntermediateTableScanRule
-  extends ConverterRule(
-    classOf[FlinkLogicalIntermediateTableScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalIntermediateTableScanRule") {
+class BatchPhysicalIntermediateTableScanRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val scan = rel.asInstanceOf[FlinkLogicalIntermediateTableScan]
@@ -44,5 +40,10 @@ class BatchPhysicalIntermediateTableScanRule
 }
 
 object BatchPhysicalIntermediateTableScanRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalIntermediateTableScanRule
+  val INSTANCE: RelOptRule = new BatchPhysicalIntermediateTableScanRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalIntermediateTableScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalIntermediateTableScanRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
index 19e1e951e69..b5d9286c036 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
@@ -28,15 +28,11 @@ import org.apache.flink.table.sinks.PartitionableTableSink
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.{RelCollations, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConversions._
 
-class BatchPhysicalLegacySinkRule
-  extends ConverterRule(
-    classOf[FlinkLogicalLegacySink],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalLegacySinkRule") {
+class BatchPhysicalLegacySinkRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val sink = rel.asInstanceOf[FlinkLogicalLegacySink]
@@ -92,5 +88,10 @@ class BatchPhysicalLegacySinkRule
 }
 
 object BatchPhysicalLegacySinkRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalLegacySinkRule
+  val INSTANCE: RelOptRule = new BatchPhysicalLegacySinkRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalLegacySink],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalLegacySinkRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala
index 5f55d0b83cf..0ca49eb479c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.sources.StreamTableSource
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.TableScan
 
 /**
  * Rule that converts [[FlinkLogicalLegacyTableSourceScan]] to
  * [[BatchPhysicalLegacyTableSourceScan]].
  */
-class BatchPhysicalLegacyTableSourceScanRule
-  extends ConverterRule(
-    classOf[FlinkLogicalLegacyTableSourceScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalLegacyTableSourceScan") {
+class BatchPhysicalLegacyTableSourceScanRule(config: Config) extends ConverterRule(config) {
 
   /** Rule must only match if TableScan targets a bounded [[StreamTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
@@ -66,5 +62,10 @@ class BatchPhysicalLegacyTableSourceScanRule
 }
 
 object BatchPhysicalLegacyTableSourceScanRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalLegacyTableSourceScanRule
+  val INSTANCE: RelOptRule = new BatchPhysicalLegacyTableSourceScanRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalLegacyTableSourceScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalLegacyTableSourceScan"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLimitRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLimitRule.scala
index 8dd0d00973a..452bdfb7bd4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLimitRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLimitRule.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.plan.utils.SortUtil
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rex.RexLiteral
 import org.apache.calcite.sql.`type`.SqlTypeName
 
@@ -45,12 +46,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
  * }}}
  * when fetch is null.
  */
-class BatchPhysicalLimitRule
-  extends ConverterRule(
-    classOf[FlinkLogicalSort],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalLimitRule") {
+class BatchPhysicalLimitRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val sort: FlinkLogicalSort = call.rel(0)
@@ -102,5 +98,10 @@ class BatchPhysicalLimitRule
 }
 
 object BatchPhysicalLimitRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalLimitRule
+  val INSTANCE: RelOptRule = new BatchPhysicalLimitRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalSort],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalLimitRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCalcRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCalcRule.scala
index 6217353b96b..abad1f05336 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCalcRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCalcRule.scala
@@ -25,16 +25,12 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConverters._
 
 /** Rule that converts [[FlinkLogicalCalc]] to [[BatchPhysicalPythonCalc]]. */
-class BatchPhysicalPythonCalcRule
-  extends ConverterRule(
-    classOf[FlinkLogicalCalc],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalPythonCalcRule") {
+class BatchPhysicalPythonCalcRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val calc: FlinkLogicalCalc = call.rel(0)
@@ -52,5 +48,10 @@ class BatchPhysicalPythonCalcRule
 }
 
 object BatchPhysicalPythonCalcRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalPythonCalcRule
+  val INSTANCE: RelOptRule = new BatchPhysicalPythonCalcRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalCalc],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalPythonCalcRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
index 35c36371297..57a557ea015 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankTyp
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.{RelCollations, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConversions._
 
@@ -41,12 +42,7 @@ import scala.collection.JavaConversions._
  *       +- input of rank
  * }}}
  */
-class BatchPhysicalRankRule
-  extends ConverterRule(
-    classOf[FlinkLogicalRank],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalRankRule") {
+class BatchPhysicalRankRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val rank: FlinkLogicalRank = call.rel(0)
@@ -115,5 +111,10 @@ class BatchPhysicalRankRule
 }
 
 object BatchPhysicalRankRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalRankRule
+  val INSTANCE: RelOptRule = new BatchPhysicalRankRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalRank],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalRankRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalScriptTransformRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalScriptTransformRule.scala
index 83d25367101..f7fb384887a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalScriptTransformRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalScriptTransformRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalScr
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule that match [[FlinkLogicalScriptTransform]] */
-class BatchPhysicalScriptTransformRule
-  extends ConverterRule(
-    classOf[FlinkLogicalScriptTransform],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchExecScriptTrans") {
+class BatchPhysicalScriptTransformRule(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val logicalScriptTransform = rel.asInstanceOf[FlinkLogicalScriptTransform]
@@ -53,5 +49,10 @@ class BatchPhysicalScriptTransformRule
 }
 
 object BatchPhysicalScriptTransformRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalScriptTransformRule
+  val INSTANCE: RelOptRule = new BatchPhysicalScriptTransformRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalScriptTransform],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchExecScriptTrans"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSingleRowJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSingleRowJoinRule.scala
index 904802f2034..156a2478725 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSingleRowJoinRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSingleRowJoinRule.scala
@@ -26,18 +26,15 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.plan.volcano.RelSubset
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core._
 
 /**
  * Rule that converts [[FlinkLogicalJoin]] to [[BatchPhysicalNestedLoopJoin]] if one of join input
  * sides returns at most a single row.
  */
-class BatchPhysicalSingleRowJoinRule
-  extends ConverterRule(
-    classOf[FlinkLogicalJoin],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalSingleRowJoinRule")
+class BatchPhysicalSingleRowJoinRule(config: Config)
+  extends ConverterRule(config)
   with BatchPhysicalJoinRuleBase
   with BatchPhysicalNestedLoopJoinRuleBase {
 
@@ -85,5 +82,10 @@ class BatchPhysicalSingleRowJoinRule
 }
 
 object BatchPhysicalSingleRowJoinRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalSingleRowJoinRule
+  val INSTANCE: RelOptRule = new BatchPhysicalSingleRowJoinRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalJoin],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalSingleRowJoinRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
index 2317759fd54..1d055774c94 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
@@ -31,16 +31,12 @@ import org.apache.flink.table.types.logical.RowType
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.{RelCollations, RelCollationTraitDef, RelNode}
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 
-class BatchPhysicalSinkRule
-  extends ConverterRule(
-    classOf[FlinkLogicalSink],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalSinkRule") {
+class BatchPhysicalSinkRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val sink = rel.asInstanceOf[FlinkLogicalSink]
@@ -118,5 +114,10 @@ class BatchPhysicalSinkRule
 }
 
 object BatchPhysicalSinkRule {
-  val INSTANCE = new BatchPhysicalSinkRule
+  val INSTANCE = new BatchPhysicalSinkRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalSink],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalSinkRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
index c0fb28bddc6..a360ce43863 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.plan.utils.SortUtil
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.sql.`type`.SqlTypeName
 
 /**
@@ -45,12 +46,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
  * }}}
  * when fetch is null
  */
-class BatchPhysicalSortLimitRule
-  extends ConverterRule(
-    classOf[FlinkLogicalSort],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalSortLimitRule") {
+class BatchPhysicalSortLimitRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val sort: FlinkLogicalSort = call.rel(0)
@@ -105,5 +101,10 @@ class BatchPhysicalSortLimitRule
 }
 
 object BatchPhysicalSortLimitRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalSortLimitRule
+  val INSTANCE: RelOptRule = new BatchPhysicalSortLimitRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalSort],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalSortLimitRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
index ef387d485d1..06c64365f31 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import java.lang.{Boolean => JBoolean}
 
@@ -36,12 +37,7 @@ import java.lang.{Boolean => JBoolean}
  * Rule that matches [[FlinkLogicalSort]] which sort fields is non-empty and both `fetch` and
  * `offset` are null, and converts it to [[BatchPhysicalSort]].
  */
-class BatchPhysicalSortRule
-  extends ConverterRule(
-    classOf[FlinkLogicalSort],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalSortRule") {
+class BatchPhysicalSortRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val sort: FlinkLogicalSort = call.rel(0)
@@ -72,7 +68,12 @@ class BatchPhysicalSortRule
 }
 
 object BatchPhysicalSortRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalSortRule
+  val INSTANCE: RelOptRule = new BatchPhysicalSortRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalSort],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalSortRule"))
 
   // It is a experimental config, will may be removed later.
   @Experimental
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala
index 0bf662cfe70..424f56d9d1b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala
@@ -27,15 +27,11 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex
 import org.apache.calcite.plan.RelOptRuleCall
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.TableScan
 
 /** Rule that converts [[FlinkLogicalTableSourceScan]] to [[BatchPhysicalTableSourceScan]]. */
-class BatchPhysicalTableSourceScanRule
-  extends ConverterRule(
-    classOf[FlinkLogicalTableSourceScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalTableSourceScanRule") {
+class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {
 
   /** Rule must only match if TableScan targets a bounded [[ScanTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
@@ -65,5 +61,10 @@ class BatchPhysicalTableSourceScanRule
 }
 
 object BatchPhysicalTableSourceScanRule {
-  val INSTANCE = new BatchPhysicalTableSourceScanRule
+  val INSTANCE: ConverterRule = new BatchPhysicalTableSourceScanRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalTableSourceScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalTableSourceScanRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalUnionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalUnionRule.scala
index 9bd8c52b021..df193cc41b4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalUnionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalUnionRule.scala
@@ -24,16 +24,12 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUni
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConversions._
 
 /** Rule that converts [[FlinkLogicalUnion]] to [[BatchPhysicalUnion]]. */
-class BatchPhysicalUnionRule
-  extends ConverterRule(
-    classOf[FlinkLogicalUnion],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalUnionRule") {
+class BatchPhysicalUnionRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     call.rel(0).asInstanceOf[FlinkLogicalUnion].all
@@ -49,5 +45,10 @@ class BatchPhysicalUnionRule
 }
 
 object BatchPhysicalUnionRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalUnionRule
+  val INSTANCE: RelOptRule = new BatchPhysicalUnionRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalUnion],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalUnionRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala
index c762791e52d..5d6e78e7475 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalVal
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule that converts [[FlinkLogicalValues]] to [[BatchPhysicalValues]]. */
-class BatchPhysicalValuesRule
-  extends ConverterRule(
-    classOf[FlinkLogicalValues],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalValuesRule") {
+class BatchPhysicalValuesRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues]
@@ -42,5 +38,10 @@ class BatchPhysicalValuesRule
 }
 
 object BatchPhysicalValuesRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalValuesRule
+  val INSTANCE: RelOptRule = new BatchPhysicalValuesRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalValues],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalValuesRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowTableFunctionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowTableFunctionRule.scala
index d38365b6502..c8d90a8cca2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowTableFunctionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowTableFunctionRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.planner.plan.utils.WindowUtil.convertToWindowingSt
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rex.RexCall
 
 /**
  * Rule to convert a [[FlinkLogicalTableFunctionScan]] with window table function call into a
  * [[BatchPhysicalWindowTableFunction]].
  */
-class BatchPhysicalWindowTableFunctionRule
-  extends ConverterRule(
-    classOf[FlinkLogicalTableFunctionScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.BATCH_PHYSICAL,
-    "BatchPhysicalWindowTableFunctionRule") {
+class BatchPhysicalWindowTableFunctionRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: FlinkLogicalTableFunctionScan = call.rel(0)
@@ -60,5 +56,10 @@ class BatchPhysicalWindowTableFunctionRule
 }
 
 object BatchPhysicalWindowTableFunctionRule {
-  val INSTANCE: RelOptRule = new BatchPhysicalWindowTableFunctionRule
+  val INSTANCE: RelOptRule = new BatchPhysicalWindowTableFunctionRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalTableFunctionScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.BATCH_PHYSICAL,
+      "BatchPhysicalWindowTableFunctionRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCalcRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCalcRule.scala
index 0bb59ff21ec..1571b23d24b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCalcRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCalcRule.scala
@@ -25,16 +25,12 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConverters._
 
 /** Rule that converts [[FlinkLogicalCalc]] to [[StreamPhysicalCalc]]. */
-class StreamPhysicalCalcRule
-  extends ConverterRule(
-    classOf[FlinkLogicalCalc],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalCalcRule") {
+class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val calc: FlinkLogicalCalc = call.rel(0)
@@ -52,5 +48,10 @@ class StreamPhysicalCalcRule
 }
 
 object StreamPhysicalCalcRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalCalcRule
+  val INSTANCE: RelOptRule = new StreamPhysicalCalcRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalCalc],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalCalcRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
index 68593c09383..83cf99c3177 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
@@ -29,15 +29,11 @@ import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.plan.volcano.RelSubset
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rex.{RexNode, RexProgram, RexProgramBuilder}
 
 /** Rule that converts [[FlinkLogicalCorrelate]] to [[StreamPhysicalCorrelate]]. */
-class StreamPhysicalCorrelateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalCorrelate],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalCorrelateRule") {
+class StreamPhysicalCorrelateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val correlate: FlinkLogicalCorrelate = call.rel(0)
@@ -102,7 +98,12 @@ class StreamPhysicalCorrelateRule
 }
 
 object StreamPhysicalCorrelateRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalCorrelateRule
+  val INSTANCE: RelOptRule = new StreamPhysicalCorrelateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalCorrelate],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalCorrelateRule"))
 
   def getMergedCalc(calc: FlinkLogicalCalc): FlinkLogicalCalc = {
     val child = calc.getInput match {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala
index 2774df0c194..433fb3af17c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala
@@ -25,14 +25,10 @@ import org.apache.flink.table.planner.plan.schema.DataStreamTable
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule that converts [[FlinkLogicalDataStreamTableScan]] to [[StreamPhysicalDataStreamScan]]. */
-class StreamPhysicalDataStreamScanRule
-  extends ConverterRule(
-    classOf[FlinkLogicalDataStreamTableScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalDataStreamScanRule") {
+class StreamPhysicalDataStreamScanRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: FlinkLogicalDataStreamTableScan = call.rel(0)
@@ -55,5 +51,10 @@ class StreamPhysicalDataStreamScanRule
 }
 
 object StreamPhysicalDataStreamScanRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalDataStreamScanRule
+  val INSTANCE: RelOptRule = new StreamPhysicalDataStreamScanRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalDataStreamTableScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalDataStreamScanRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala
index 03f5420536a..e232a4646be 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.planner.plan.utils.RankUtil
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /**
  * Rule that matches [[FlinkLogicalRank]] which is sorted by time attribute and limits 1 and its
@@ -45,12 +46,7 @@ import org.apache.calcite.rel.convert.ConverterRule
  * rowtime DESC) as row_num FROM MyTable ) WHERE row_num <= 1 }}} will be converted to
  * StreamExecDeduplicate which keeps last row in rowtime.
  */
-class StreamPhysicalDeduplicateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalRank],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalDeduplicateRule") {
+class StreamPhysicalDeduplicateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val rank: FlinkLogicalRank = call.rel(0)
@@ -94,5 +90,10 @@ class StreamPhysicalDeduplicateRule
 }
 
 object StreamPhysicalDeduplicateRule {
-  val INSTANCE = new StreamPhysicalDeduplicateRule
+  val INSTANCE = new StreamPhysicalDeduplicateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalRank],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalDeduplicateRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalExpandRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalExpandRule.scala
index fb61c316694..850ea926623 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalExpandRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalExpandRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalE
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule that converts [[FlinkLogicalExpand]] to [[StreamPhysicalExpand]]. */
-class StreamPhysicalExpandRule
-  extends ConverterRule(
-    classOf[FlinkLogicalExpand],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalExpandRule") {
+class StreamPhysicalExpandRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val expand = rel.asInstanceOf[FlinkLogicalExpand]
@@ -47,5 +43,10 @@ class StreamPhysicalExpandRule
 }
 
 object StreamPhysicalExpandRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalExpandRule
+  val INSTANCE: RelOptRule = new StreamPhysicalExpandRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalExpand],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalExpandRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
index 371788c5c07..a7f71c2e515 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
@@ -29,17 +29,13 @@ import org.apache.flink.table.planner.plan.utils.WindowUtil
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.Aggregate.Group
 
 import scala.collection.JavaConversions._
 
 /** Rule to convert a [[FlinkLogicalAggregate]] into a [[StreamPhysicalGroupAggregate]]. */
-class StreamPhysicalGroupAggregateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalAggregate],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalGroupAggregateRule") {
+class StreamPhysicalGroupAggregateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg: FlinkLogicalAggregate = call.rel(0)
@@ -86,5 +82,10 @@ class StreamPhysicalGroupAggregateRule
 }
 
 object StreamPhysicalGroupAggregateRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalGroupAggregateRule
+  val INSTANCE: RelOptRule = new StreamPhysicalGroupAggregateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalAggregate],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalGroupAggregateRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupTableAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupTableAggregateRule.scala
index 6b07092398e..01eb41a1c12 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupTableAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupTableAggregateRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConversions._
 
 /**
  * Rule to convert a [[FlinkLogicalTableAggregate]] into a [[StreamPhysicalGroupTableAggregate]].
  */
-class StreamPhysicalGroupTableAggregateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalTableAggregate],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalGroupTableAggregateRule") {
+class StreamPhysicalGroupTableAggregateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg: FlinkLogicalTableAggregate = call.rel(0)
@@ -70,5 +66,10 @@ class StreamPhysicalGroupTableAggregateRule
 }
 
 object StreamPhysicalGroupTableAggregateRule {
-  val INSTANCE: StreamPhysicalGroupTableAggregateRule = new StreamPhysicalGroupTableAggregateRule()
+  val INSTANCE: StreamPhysicalGroupTableAggregateRule = new StreamPhysicalGroupTableAggregateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalTableAggregate],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalGroupTableAggregateRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
index 2fe94c717af..9bfaebdd07c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
@@ -18,7 +18,6 @@
 package org.apache.flink.table.planner.plan.rules.physical.stream
 
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.planner.calcite.FlinkContext
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate
@@ -30,6 +29,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.Aggregate.Group
 
 import scala.collection.JavaConversions._
@@ -37,12 +37,7 @@ import scala.collection.JavaConversions._
 /**
  * Rule to convert a [[FlinkLogicalWindowAggregate]] into a [[StreamPhysicalGroupWindowAggregate]].
  */
-class StreamPhysicalGroupWindowAggregateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalWindowAggregate],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalGroupWindowAggregateRule") {
+class StreamPhysicalGroupWindowAggregateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg: FlinkLogicalWindowAggregate = call.rel(0)
@@ -88,5 +83,10 @@ class StreamPhysicalGroupWindowAggregateRule
 }
 
 object StreamPhysicalGroupWindowAggregateRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalGroupWindowAggregateRule
+  val INSTANCE: RelOptRule = new StreamPhysicalGroupWindowAggregateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalWindowAggregate],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalGroupWindowAggregateRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowTableAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowTableAggregateRule.scala
index 7379ef41dc3..e5f6dd35d83 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowTableAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowTableAggregateRule.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConversions._
 
@@ -35,12 +36,7 @@ import scala.collection.JavaConversions._
  * Rule to convert a [[FlinkLogicalWindowTableAggregate]] into a
  * [[StreamPhysicalGroupWindowTableAggregate]].
  */
-class StreamPhysicalGroupWindowTableAggregateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalWindowTableAggregate],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalGroupWindowTableAggregateRule") {
+class StreamPhysicalGroupWindowTableAggregateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg: FlinkLogicalWindowTableAggregate = call.rel(0)
@@ -86,5 +82,10 @@ class StreamPhysicalGroupWindowTableAggregateRule
 }
 
 object StreamPhysicalGroupWindowTableAggregateRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalGroupWindowTableAggregateRule
+  val INSTANCE: RelOptRule = new StreamPhysicalGroupWindowTableAggregateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalWindowTableAggregate],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalGroupWindowTableAggregateRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntermediateTableScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntermediateTableScanRule.scala
index 31119899586..30752544b14 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntermediateTableScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntermediateTableScanRule.scala
@@ -24,17 +24,13 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalI
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /**
  * Rule that converts [[FlinkLogicalIntermediateTableScan]] to
  * [[StreamPhysicalIntermediateTableScan]].
  */
-class StreamPhysicalIntermediateTableScanRule
-  extends ConverterRule(
-    classOf[FlinkLogicalIntermediateTableScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalIntermediateTableScanRule") {
+class StreamPhysicalIntermediateTableScanRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val scan = rel.asInstanceOf[FlinkLogicalIntermediateTableScan]
@@ -44,5 +40,10 @@ class StreamPhysicalIntermediateTableScanRule
 }
 
 object StreamPhysicalIntermediateTableScanRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalIntermediateTableScanRule
+  val INSTANCE: RelOptRule = new StreamPhysicalIntermediateTableScanRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalIntermediateTableScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalIntermediateTableScanRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
index 9170336a2d2..a710bd3f008 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
@@ -27,15 +27,11 @@ import org.apache.flink.table.sinks.PartitionableTableSink
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConversions._
 
-class StreamPhysicalLegacySinkRule
-  extends ConverterRule(
-    classOf[FlinkLogicalLegacySink],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalLegacySinkRule") {
+class StreamPhysicalLegacySinkRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val sink = rel.asInstanceOf[FlinkLogicalLegacySink]
@@ -89,5 +85,10 @@ class StreamPhysicalLegacySinkRule
 }
 
 object StreamPhysicalLegacySinkRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalLegacySinkRule
+  val INSTANCE: RelOptRule = new StreamPhysicalLegacySinkRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalLegacySink],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalLegacySinkRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala
index 80705361b2a..3df07bac263 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.sources.StreamTableSource
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.TableScan
 
 /**
  * Rule that converts [[FlinkLogicalLegacyTableSourceScan]] to
  * [[StreamPhysicalLegacyTableSourceScan]].
  */
-class StreamPhysicalLegacyTableSourceScanRule
-  extends ConverterRule(
-    classOf[FlinkLogicalLegacyTableSourceScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalLegacyTableSourceScanRule") {
+class StreamPhysicalLegacyTableSourceScanRule(config: Config) extends ConverterRule(config) {
 
   /** Rule must only match if TableScan targets a [[StreamTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
@@ -67,5 +63,10 @@ class StreamPhysicalLegacyTableSourceScanRule
 }
 
 object StreamPhysicalLegacyTableSourceScanRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalLegacyTableSourceScanRule
+  val INSTANCE: RelOptRule = new StreamPhysicalLegacyTableSourceScanRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalLegacyTableSourceScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalLegacyTableSourceScanRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLimitRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLimitRule.scala
index 6f69285c234..5ffd3ea872f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLimitRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLimitRule.scala
@@ -25,17 +25,13 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalL
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /**
  * Rule that matches [[FlinkLogicalSort]] with empty sort fields, and converts it to
  * [[StreamPhysicalLimit]].
  */
-class StreamPhysicalLimitRule
-  extends ConverterRule(
-    classOf[FlinkLogicalSort],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalLimitRule") {
+class StreamPhysicalLimitRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val sort: FlinkLogicalSort = call.rel(0)
@@ -65,5 +61,10 @@ class StreamPhysicalLimitRule
 }
 
 object StreamPhysicalLimitRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalLimitRule
+  val INSTANCE: RelOptRule = new StreamPhysicalLimitRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalSort],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalLimitRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalOverAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalOverAggregateRule.scala
index 9070b111fc2..60fdaceb93c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalOverAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalOverAggregateRule.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConverters._
 
@@ -35,12 +36,7 @@ import scala.collection.JavaConverters._
  * StreamExecOverAggregate only supports one [[org.apache.calcite.rel.core.Window.Group]], else
  * throw exception now
  */
-class StreamPhysicalOverAggregateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalOverAggregate],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalOverAggregateRule") {
+class StreamPhysicalOverAggregateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val logicWindow: FlinkLogicalOverAggregate =
@@ -82,5 +78,10 @@ class StreamPhysicalOverAggregateRule
 }
 
 object StreamPhysicalOverAggregateRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalOverAggregateRule
+  val INSTANCE: RelOptRule = new StreamPhysicalOverAggregateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalOverAggregate],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalOverAggregateRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCalcRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCalcRule.scala
index 9f26fba85ac..0e51aaa9dff 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCalcRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCalcRule.scala
@@ -25,16 +25,12 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConverters._
 
 /** Rule that converts [[FlinkLogicalCalc]] to [[StreamPhysicalPythonCalc]]. */
-class StreamPhysicalPythonCalcRule
-  extends ConverterRule(
-    classOf[FlinkLogicalCalc],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalPythonCalcRule") {
+class StreamPhysicalPythonCalcRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val calc: FlinkLogicalCalc = call.rel(0)
@@ -57,5 +53,10 @@ class StreamPhysicalPythonCalcRule
 }
 
 object StreamPhysicalPythonCalcRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalPythonCalcRule
+  val INSTANCE: RelOptRule = new StreamPhysicalPythonCalcRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalCalc],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalPythonCalcRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
index 52ad809acd7..08c3616ae1f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
@@ -20,24 +20,19 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank
+import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate, StreamPhysicalRank}
 import org.apache.flink.table.planner.plan.utils.{RankProcessStrategy, RankUtil}
 
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /**
  * Rule that converts [[FlinkLogicalRank]] with fetch to [[StreamPhysicalRank]]. NOTES: the rank can
  * not be converted to [[StreamPhysicalDeduplicate]].
  */
-class StreamPhysicalRankRule
-  extends ConverterRule(
-    classOf[FlinkLogicalRank],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalRankRule") {
+class StreamPhysicalRankRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val rank: FlinkLogicalRank = call.rel(0)
@@ -73,5 +68,10 @@ class StreamPhysicalRankRule
 }
 
 object StreamPhysicalRankRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalRankRule
+  val INSTANCE: RelOptRule = new StreamPhysicalRankRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalRank],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalRankRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
index 8f159eaaf8b..c7e4158d2d9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
@@ -30,16 +30,12 @@ import org.apache.flink.table.types.logical.RowType
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 
-class StreamPhysicalSinkRule
-  extends ConverterRule(
-    classOf[FlinkLogicalSink],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalSinkRule") {
+class StreamPhysicalSinkRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val sink = rel.asInstanceOf[FlinkLogicalSink]
@@ -109,5 +105,10 @@ class StreamPhysicalSinkRule
 }
 
 object StreamPhysicalSinkRule {
-  val INSTANCE = new StreamPhysicalSinkRule
+  val INSTANCE = new StreamPhysicalSinkRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalSink],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalSinkRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala
index f3330f55ace..9bf4c51eb23 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala
@@ -26,17 +26,13 @@ import org.apache.flink.table.planner.plan.utils.RankProcessStrategy
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /**
  * Rule that matches [[FlinkLogicalSort]] with non-empty sort fields and non-null fetch or offset,
  * and converts it to [[StreamPhysicalSortLimit]].
  */
-class StreamPhysicalSortLimitRule
-  extends ConverterRule(
-    classOf[FlinkLogicalSort],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalSortLimitRule") {
+class StreamPhysicalSortLimitRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val sort: FlinkLogicalSort = call.rel(0)
@@ -67,5 +63,10 @@ class StreamPhysicalSortLimitRule
 }
 
 object StreamPhysicalSortLimitRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalSortLimitRule
+  val INSTANCE: RelOptRule = new StreamPhysicalSortLimitRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalSort],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalSortLimitRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortRule.scala
index adf757cbfca..63958f7235c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortRule.scala
@@ -25,17 +25,13 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalS
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /**
  * Rule that matches [[FlinkLogicalSort]] which `fetch` is null or `fetch` is 0, and converts it to
  * [[StreamPhysicalSort]].
  */
-class StreamPhysicalSortRule
-  extends ConverterRule(
-    classOf[FlinkLogicalSort],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalSortRule") {
+class StreamPhysicalSortRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val sort: FlinkLogicalSort = call.rel(0)
@@ -59,5 +55,10 @@ class StreamPhysicalSortRule
 }
 
 object StreamPhysicalSortRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalSortRule
+  val INSTANCE: RelOptRule = new StreamPhysicalSortRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalSort],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalSortRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
index e7c390e07bd..49632bf60e0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.TableScan
 
 /**
@@ -38,12 +39,7 @@ import org.apache.calcite.rel.core.TableScan
  * <p>Depends whether this is a scan source, this rule will also generate
  * [[StreamPhysicalChangelogNormalize]] to materialize the upsert stream.
  */
-class StreamPhysicalTableSourceScanRule
-  extends ConverterRule(
-    classOf[FlinkLogicalTableSourceScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalTableSourceScanRule") {
+class StreamPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {
 
   /** Rule must only match if TableScan targets a [[ScanTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
@@ -99,5 +95,10 @@ class StreamPhysicalTableSourceScanRule
 }
 
 object StreamPhysicalTableSourceScanRule {
-  val INSTANCE = new StreamPhysicalTableSourceScanRule
+  val INSTANCE = new StreamPhysicalTableSourceScanRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalTableSourceScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalTableSourceScanRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalSortRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalSortRule.scala
index 78ccf2dfcf0..6005e096383 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalSortRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalSortRule.scala
@@ -27,17 +27,13 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelFieldCollation.Direction
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /**
  * Rule that matches [[FlinkLogicalSort]] which is sorted by time attribute in ascending order and
  * its `fetch` and `offset` are null, and converts it to [[StreamPhysicalTemporalSort]].
  */
-class StreamPhysicalTemporalSortRule
-  extends ConverterRule(
-    classOf[FlinkLogicalSort],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalTemporalSortRule") {
+class StreamPhysicalTemporalSortRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val sort: FlinkLogicalSort = call.rel(0)
@@ -62,7 +58,12 @@ class StreamPhysicalTemporalSortRule
 }
 
 object StreamPhysicalTemporalSortRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalTemporalSortRule
+  val INSTANCE: RelOptRule = new StreamPhysicalTemporalSortRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalSort],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalTemporalSortRule"))
 
   /**
    * Whether the given sort could be converted to [[StreamPhysicalTemporalSort]].
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalUnionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalUnionRule.scala
index 3e133b6325e..00131e78332 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalUnionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalUnionRule.scala
@@ -24,16 +24,12 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalU
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 import scala.collection.JavaConversions._
 
 /** Rule that converts [[FlinkLogicalUnion]] to [[StreamPhysicalUnion]]. */
-class StreamPhysicalUnionRule
-  extends ConverterRule(
-    classOf[FlinkLogicalUnion],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalUnionRule") {
+class StreamPhysicalUnionRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     call.rel(0).asInstanceOf[FlinkLogicalUnion].all
@@ -49,5 +45,10 @@ class StreamPhysicalUnionRule
 }
 
 object StreamPhysicalUnionRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalUnionRule
+  val INSTANCE: RelOptRule = new StreamPhysicalUnionRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalUnion],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalUnionRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalValuesRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalValuesRule.scala
index 3d6fcd3dbc7..eef97aa0e06 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalValuesRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalValuesRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalV
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule that converts [[FlinkLogicalValues]] to [[StreamPhysicalValues]]. */
-class StreamPhysicalValuesRule
-  extends ConverterRule(
-    classOf[FlinkLogicalValues],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalValuesRule") {
+class StreamPhysicalValuesRule(config: Config) extends ConverterRule(config) {
 
   def convert(rel: RelNode): RelNode = {
     val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues]
@@ -42,5 +38,10 @@ class StreamPhysicalValuesRule
 }
 
 object StreamPhysicalValuesRule {
-  val INSTANCE: RelOptRule = new StreamPhysicalValuesRule
+  val INSTANCE: RelOptRule = new StreamPhysicalValuesRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalValues],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalValuesRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
index 337e2ec1c1e..c6b62b0d09e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalW
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule that converts [[FlinkLogicalWatermarkAssigner]] to [[StreamPhysicalWatermarkAssigner]]. */
-class StreamPhysicalWatermarkAssignerRule
-  extends ConverterRule(
-    classOf[FlinkLogicalWatermarkAssigner],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalWatermarkAssignerRule") {
+class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule(config) {
 
   override def convert(rel: RelNode): RelNode = {
     val watermarkAssigner = rel.asInstanceOf[FlinkLogicalWatermarkAssigner]
@@ -50,5 +46,10 @@ class StreamPhysicalWatermarkAssignerRule
 }
 
 object StreamPhysicalWatermarkAssignerRule {
-  val INSTANCE = new StreamPhysicalWatermarkAssignerRule
+  val INSTANCE = new StreamPhysicalWatermarkAssignerRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalWatermarkAssigner],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalWatermarkAssignerRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
index e33bac81876..4778c4c9c0a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.runtime.groupwindow._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rel.core.Aggregate.Group
 import org.apache.calcite.rex.{RexInputRef, RexProgram}
 
@@ -39,12 +40,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 /** Rule to convert a [[FlinkLogicalAggregate]] into a [[StreamPhysicalWindowAggregate]]. */
-class StreamPhysicalWindowAggregateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalAggregate],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalWindowAggregateRule") {
+class StreamPhysicalWindowAggregateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg: FlinkLogicalAggregate = call.rel(0)
@@ -247,7 +243,12 @@ class StreamPhysicalWindowAggregateRule
 }
 
 object StreamPhysicalWindowAggregateRule {
-  val INSTANCE = new StreamPhysicalWindowAggregateRule
+  val INSTANCE = new StreamPhysicalWindowAggregateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalAggregate],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalWindowAggregateRule"))
 
   private val WINDOW_START: String = "window_start"
   private val WINDOW_END: String = "window_end"
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowDeduplicateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowDeduplicateRule.scala
index e145ceb3fb3..c45229062d9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowDeduplicateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowDeduplicateRule.scala
@@ -28,14 +28,10 @@ import org.apache.flink.table.planner.plan.utils.{RankUtil, WindowUtil}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule to convert a [[FlinkLogicalRank]] into a [[StreamPhysicalWindowDeduplicate]]. */
-class StreamPhysicalWindowDeduplicateRule
-  extends ConverterRule(
-    classOf[FlinkLogicalRank],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalWindowDeduplicateRule") {
+class StreamPhysicalWindowDeduplicateRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val rank: FlinkLogicalRank = call.rel(0)
@@ -89,5 +85,10 @@ class StreamPhysicalWindowDeduplicateRule
 }
 
 object StreamPhysicalWindowDeduplicateRule {
-  val INSTANCE = new StreamPhysicalWindowDeduplicateRule
+  val INSTANCE = new StreamPhysicalWindowDeduplicateRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalRank],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalWindowDeduplicateRule"))
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowRankRule.scala
index 7193295df3a..7259e3495e0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowRankRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowRankRule.scala
@@ -28,14 +28,10 @@ import org.apache.flink.table.planner.plan.utils.{RankUtil, WindowUtil}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 
 /** Rule to convert a [[FlinkLogicalRank]] into a [[StreamPhysicalWindowRank]]. */
-class StreamPhysicalWindowRankRule
-  extends ConverterRule(
-    classOf[FlinkLogicalRank],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalWindowRankRule") {
+class StreamPhysicalWindowRankRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val rank: FlinkLogicalRank = call.rel(0)
@@ -89,6 +85,11 @@ class StreamPhysicalWindowRankRule
 }
 
 object StreamPhysicalWindowRankRule {
-  val INSTANCE = new StreamPhysicalWindowRankRule
+  val INSTANCE = new StreamPhysicalWindowRankRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalRank],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalWindowRankRule"))
 
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowTableFunctionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowTableFunctionRule.scala
index 23dc245e050..6ceeef69ac1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowTableFunctionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowTableFunctionRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.planner.plan.utils.WindowUtil.{convertToWindowingS
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
 import org.apache.calcite.rex.RexCall
 
 /**
  * Rule to convert a [[FlinkLogicalTableFunctionScan]] with window table function call into a
  * [[StreamPhysicalWindowTableFunction]].
  */
-class StreamPhysicalWindowTableFunctionRule
-  extends ConverterRule(
-    classOf[FlinkLogicalTableFunctionScan],
-    FlinkConventions.LOGICAL,
-    FlinkConventions.STREAM_PHYSICAL,
-    "StreamPhysicalWindowTableFunctionRule") {
+class StreamPhysicalWindowTableFunctionRule(config: Config) extends ConverterRule(config) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: FlinkLogicalTableFunctionScan = call.rel(0)
@@ -64,5 +60,10 @@ class StreamPhysicalWindowTableFunctionRule
 }
 
 object StreamPhysicalWindowTableFunctionRule {
-  val INSTANCE = new StreamPhysicalWindowTableFunctionRule
+  val INSTANCE = new StreamPhysicalWindowTableFunctionRule(
+    Config.INSTANCE.withConversion(
+      classOf[FlinkLogicalTableFunctionScan],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.STREAM_PHYSICAL,
+      "StreamPhysicalWindowTableFunctionRule"))
 }