You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2023/01/25 08:40:21 UTC
[flink] branch master updated: [FLINK-29215][table-planner] Use config based constructors for converter rules instead of deprecated
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0ce8cb1f2bd [FLINK-29215][table-planner] Use config based constructors for converter rules instead of deprecated
0ce8cb1f2bd is described below
commit 0ce8cb1f2bd13c7cfca8b777972db0cbaa99af46
Author: Sergey Nuyanzin <sn...@gmail.com>
AuthorDate: Tue Aug 23 11:12:24 2022 +0200
[FLINK-29215][table-planner] Use config based constructors for converter rules instead of deprecated
This closes #20773.
---
.../batch/BatchPhysicalPythonAggregateRule.java | 20 +++++++-----
.../batch/BatchPhysicalPythonCorrelateRule.java | 18 +++++++----
.../physical/common/CommonPhysicalMatchRule.java | 2 +-
.../stream/StreamPhysicalPythonCorrelateRule.java | 18 +++++++----
.../StreamPhysicalPythonGroupAggregateRule.java | 18 +++++++----
...treamPhysicalPythonGroupTableAggregateRule.java | 18 +++++++----
...reamPhysicalPythonGroupWindowAggregateRule.java | 19 ++++++-----
.../StreamPhysicalPythonOverAggregateRule.java | 18 +++++++----
.../plan/nodes/logical/FlinkLogicalAggregate.scala | 37 ++++++++++++++--------
.../plan/nodes/logical/FlinkLogicalCalc.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalCorrelate.scala | 17 ++++++----
.../logical/FlinkLogicalDataStreamTableScan.scala | 15 +++++----
.../nodes/logical/FlinkLogicalDistribution.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalExpand.scala | 15 +++++----
.../FlinkLogicalIntermediateTableScan.scala | 16 +++++-----
.../plan/nodes/logical/FlinkLogicalIntersect.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalJoin.scala | 15 +++++----
.../nodes/logical/FlinkLogicalLegacySink.scala | 15 +++++----
.../FlinkLogicalLegacyTableSourceScan.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalMatch.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalMinus.scala | 15 +++++----
.../nodes/logical/FlinkLogicalOverAggregate.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalRank.scala | 15 +++++----
.../logical/FlinkLogicalScriptTransform.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalSink.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalSnapshot.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalSort.scala | 29 +++++++++--------
.../nodes/logical/FlinkLogicalTableAggregate.scala | 15 +++++----
.../logical/FlinkLogicalTableFunctionScan.scala | 17 +++++-----
.../logical/FlinkLogicalTableSourceScan.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalUnion.scala | 15 +++++----
.../plan/nodes/logical/FlinkLogicalValues.scala | 15 +++++----
.../logical/FlinkLogicalWatermarkAssigner.scala | 15 +++++----
.../logical/FlinkLogicalWindowAggregate.scala | 15 +++++----
.../logical/FlinkLogicalWindowTableAggregate.scala | 15 +++++----
.../batch/BatchPhysicalBoundedStreamScanRule.scala | 15 +++++----
.../physical/batch/BatchPhysicalCalcRule.scala | 15 +++++----
.../batch/BatchPhysicalCorrelateRule.scala | 15 +++++----
.../batch/BatchPhysicalDistributionRule.scala | 16 +++++-----
.../physical/batch/BatchPhysicalExpandRule.scala | 15 +++++----
.../BatchPhysicalIntermediateTableScanRule.scala | 15 +++++----
.../batch/BatchPhysicalLegacySinkRule.scala | 15 +++++----
.../BatchPhysicalLegacyTableSourceScanRule.scala | 15 +++++----
.../physical/batch/BatchPhysicalLimitRule.scala | 15 +++++----
.../batch/BatchPhysicalPythonCalcRule.scala | 15 +++++----
.../physical/batch/BatchPhysicalRankRule.scala | 15 +++++----
.../batch/BatchPhysicalScriptTransformRule.scala | 15 +++++----
.../batch/BatchPhysicalSingleRowJoinRule.scala | 16 ++++++----
.../physical/batch/BatchPhysicalSinkRule.scala | 15 +++++----
.../batch/BatchPhysicalSortLimitRule.scala | 15 +++++----
.../physical/batch/BatchPhysicalSortRule.scala | 15 +++++----
.../batch/BatchPhysicalTableSourceScanRule.scala | 15 +++++----
.../physical/batch/BatchPhysicalUnionRule.scala | 15 +++++----
.../physical/batch/BatchPhysicalValuesRule.scala | 15 +++++----
.../BatchPhysicalWindowTableFunctionRule.scala | 15 +++++----
.../physical/stream/StreamPhysicalCalcRule.scala | 15 +++++----
.../stream/StreamPhysicalCorrelateRule.scala | 15 +++++----
.../stream/StreamPhysicalDataStreamScanRule.scala | 15 +++++----
.../stream/StreamPhysicalDeduplicateRule.scala | 15 +++++----
.../physical/stream/StreamPhysicalExpandRule.scala | 15 +++++----
.../stream/StreamPhysicalGroupAggregateRule.scala | 15 +++++----
.../StreamPhysicalGroupTableAggregateRule.scala | 15 +++++----
.../StreamPhysicalGroupWindowAggregateRule.scala | 16 +++++-----
...reamPhysicalGroupWindowTableAggregateRule.scala | 15 +++++----
.../StreamPhysicalIntermediateTableScanRule.scala | 15 +++++----
.../stream/StreamPhysicalLegacySinkRule.scala | 15 +++++----
.../StreamPhysicalLegacyTableSourceScanRule.scala | 15 +++++----
.../physical/stream/StreamPhysicalLimitRule.scala | 15 +++++----
.../stream/StreamPhysicalOverAggregateRule.scala | 15 +++++----
.../stream/StreamPhysicalPythonCalcRule.scala | 15 +++++----
.../physical/stream/StreamPhysicalRankRule.scala | 18 +++++------
.../physical/stream/StreamPhysicalSinkRule.scala | 15 +++++----
.../stream/StreamPhysicalSortLimitRule.scala | 15 +++++----
.../physical/stream/StreamPhysicalSortRule.scala | 15 +++++----
.../stream/StreamPhysicalTableSourceScanRule.scala | 15 +++++----
.../stream/StreamPhysicalTemporalSortRule.scala | 15 +++++----
.../physical/stream/StreamPhysicalUnionRule.scala | 15 +++++----
.../physical/stream/StreamPhysicalValuesRule.scala | 15 +++++----
.../StreamPhysicalWatermarkAssignerRule.scala | 15 +++++----
.../stream/StreamPhysicalWindowAggregateRule.scala | 15 +++++----
.../StreamPhysicalWindowDeduplicateRule.scala | 15 +++++----
.../stream/StreamPhysicalWindowRankRule.scala | 15 +++++----
.../StreamPhysicalWindowTableFunctionRule.scala | 15 +++++----
83 files changed, 706 insertions(+), 597 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonAggregateRule.java
index a4d1ca9747a..6552bb69df5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonAggregateRule.java
@@ -55,14 +55,18 @@ import scala.collection.Seq;
*/
public class BatchPhysicalPythonAggregateRule extends ConverterRule {
- public static final RelOptRule INSTANCE = new BatchPhysicalPythonAggregateRule();
-
- private BatchPhysicalPythonAggregateRule() {
- super(
- FlinkLogicalAggregate.class,
- FlinkConventions.LOGICAL(),
- FlinkConventions.BATCH_PHYSICAL(),
- "BatchPhysicalPythonAggregateRule");
+ public static final RelOptRule INSTANCE =
+ new BatchPhysicalPythonAggregateRule(
+ Config.INSTANCE
+ .withConversion(
+ FlinkLogicalAggregate.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.BATCH_PHYSICAL(),
+ "BatchPhysicalPythonAggregateRule")
+ .withRuleFactory(BatchPhysicalPythonAggregateRule::new));
+
+ protected BatchPhysicalPythonAggregateRule(Config config) {
+ super(config);
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
index 0a81fb2aad5..7f3e1c5d9d7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
@@ -42,14 +42,18 @@ import scala.Some;
*/
public class BatchPhysicalPythonCorrelateRule extends ConverterRule {
- public static final RelOptRule INSTANCE = new BatchPhysicalPythonCorrelateRule();
+ public static final RelOptRule INSTANCE =
+ new BatchPhysicalPythonCorrelateRule(
+ Config.INSTANCE
+ .withConversion(
+ FlinkLogicalCorrelate.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.BATCH_PHYSICAL(),
+ "BatchPhysicalPythonCorrelateRule")
+ .withRuleFactory(BatchPhysicalPythonCorrelateRule::new));
- private BatchPhysicalPythonCorrelateRule() {
- super(
- FlinkLogicalCorrelate.class,
- FlinkConventions.LOGICAL(),
- FlinkConventions.BATCH_PHYSICAL(),
- "BatchPhysicalPythonCorrelateRule");
+ private BatchPhysicalPythonCorrelateRule(Config config) {
+ super(config);
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
index ad1b15e8ea8..5476fbbbebe 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
@@ -52,7 +52,7 @@ public abstract class CommonPhysicalMatchRule extends ConverterRule {
public CommonPhysicalMatchRule(
Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix) {
- super(clazz, in, out, descriptionPrefix);
+ super(Config.INSTANCE.as(Config.class).withConversion(clazz, in, out, descriptionPrefix));
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
index 3fcbe0f3f96..637cc08a594 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
@@ -42,14 +42,18 @@ import scala.Some;
*/
public class StreamPhysicalPythonCorrelateRule extends ConverterRule {
- public static final RelOptRule INSTANCE = new StreamPhysicalPythonCorrelateRule();
+ public static final RelOptRule INSTANCE =
+ new StreamPhysicalPythonCorrelateRule(
+ Config.INSTANCE
+ .withConversion(
+ FlinkLogicalCorrelate.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.STREAM_PHYSICAL(),
+ "StreamPhysicalPythonCorrelateRule")
+ .withRuleFactory(StreamPhysicalPythonCorrelateRule::new));
- private StreamPhysicalPythonCorrelateRule() {
- super(
- FlinkLogicalCorrelate.class,
- FlinkConventions.LOGICAL(),
- FlinkConventions.STREAM_PHYSICAL(),
- "StreamPhysicalPythonCorrelateRule");
+ private StreamPhysicalPythonCorrelateRule(Config config) {
+ super(config);
}
// find only calc and table function
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
index 602bdefeb42..acae431f2a7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
@@ -43,14 +43,18 @@ import java.util.List;
*/
public class StreamPhysicalPythonGroupAggregateRule extends ConverterRule {
- public static final RelOptRule INSTANCE = new StreamPhysicalPythonGroupAggregateRule();
+ public static final RelOptRule INSTANCE =
+ new StreamPhysicalPythonGroupAggregateRule(
+ Config.INSTANCE
+ .withConversion(
+ FlinkLogicalAggregate.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.STREAM_PHYSICAL(),
+ "StreamPhysicalPythonGroupAggregateRule")
+ .withRuleFactory(StreamPhysicalPythonGroupAggregateRule::new));
- public StreamPhysicalPythonGroupAggregateRule() {
- super(
- FlinkLogicalAggregate.class,
- FlinkConventions.LOGICAL(),
- FlinkConventions.STREAM_PHYSICAL(),
- "StreamPhysicalPythonGroupAggregateRule");
+ public StreamPhysicalPythonGroupAggregateRule(Config config) {
+ super(config);
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java
index 81bd14658ce..cbe6fbcc7de 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java
@@ -42,14 +42,18 @@ import java.util.List;
*/
public class StreamPhysicalPythonGroupTableAggregateRule extends ConverterRule {
- public static final RelOptRule INSTANCE = new StreamPhysicalPythonGroupTableAggregateRule();
+ public static final RelOptRule INSTANCE =
+ new StreamPhysicalPythonGroupTableAggregateRule(
+ Config.INSTANCE
+ .withConversion(
+ FlinkLogicalTableAggregate.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.STREAM_PHYSICAL(),
+ "StreamPhysicalPythonGroupTableAggregateRule")
+ .withRuleFactory(StreamPhysicalPythonGroupTableAggregateRule::new));
- public StreamPhysicalPythonGroupTableAggregateRule() {
- super(
- FlinkLogicalTableAggregate.class,
- FlinkConventions.LOGICAL(),
- FlinkConventions.STREAM_PHYSICAL(),
- "StreamPhysicalPythonGroupTableAggregateRule");
+ public StreamPhysicalPythonGroupTableAggregateRule(Config config) {
+ super(config);
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
index 5f080d76da7..cdb2f54d3cb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
@@ -50,14 +50,17 @@ import java.util.List;
public class StreamPhysicalPythonGroupWindowAggregateRule extends ConverterRule {
public static final StreamPhysicalPythonGroupWindowAggregateRule INSTANCE =
- new StreamPhysicalPythonGroupWindowAggregateRule();
-
- private StreamPhysicalPythonGroupWindowAggregateRule() {
- super(
- FlinkLogicalWindowAggregate.class,
- FlinkConventions.LOGICAL(),
- FlinkConventions.STREAM_PHYSICAL(),
- "StreamPhysicalPythonGroupWindowAggregateRule");
+ new StreamPhysicalPythonGroupWindowAggregateRule(
+ Config.INSTANCE
+ .withConversion(
+ FlinkLogicalWindowAggregate.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.STREAM_PHYSICAL(),
+ "StreamPhysicalPythonGroupWindowAggregateRule")
+ .withRuleFactory(StreamPhysicalPythonGroupWindowAggregateRule::new));
+
+ private StreamPhysicalPythonGroupWindowAggregateRule(Config config) {
+ super(config);
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonOverAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonOverAggregateRule.java
index 86da96b9caa..934daf42e1c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonOverAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonOverAggregateRule.java
@@ -41,15 +41,19 @@ import java.util.List;
* StreamPhysicalPythonOverAggregate}.
*/
public class StreamPhysicalPythonOverAggregateRule extends ConverterRule {
+
public static final StreamPhysicalPythonOverAggregateRule INSTANCE =
- new StreamPhysicalPythonOverAggregateRule();
+ new StreamPhysicalPythonOverAggregateRule(
+ Config.INSTANCE
+ .withConversion(
+ FlinkLogicalOverAggregate.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.STREAM_PHYSICAL(),
+ "StreamPhysicalPythonOverAggregateRule")
+ .withRuleFactory(StreamPhysicalPythonOverAggregateRule::new));
- private StreamPhysicalPythonOverAggregateRule() {
- super(
- FlinkLogicalOverAggregate.class,
- FlinkConventions.LOGICAL(),
- FlinkConventions.STREAM_PHYSICAL(),
- "StreamPhysicalPythonOverAggregateRule");
+ private StreamPhysicalPythonOverAggregateRule(Config config) {
+ super(config);
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
index 27d03f9470d..0c9c1f2ca03 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -94,12 +94,8 @@ class FlinkLogicalAggregate(
}
}
-private class FlinkLogicalAggregateBatchConverter
- extends ConverterRule(
- classOf[LogicalAggregate],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalAggregateBatchConverter") {
+private class FlinkLogicalAggregateBatchConverter(config: ConverterRule.Config)
+ extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val agg = call.rel(0).asInstanceOf[LogicalAggregate]
@@ -126,12 +122,8 @@ private class FlinkLogicalAggregateBatchConverter
}
}
-private class FlinkLogicalAggregateStreamConverter
- extends ConverterRule(
- classOf[LogicalAggregate],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalAggregateStreamConverter") {
+private class FlinkLogicalAggregateStreamConverter(config: ConverterRule.Config)
+ extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val agg = call.rel(0).asInstanceOf[LogicalAggregate]
@@ -152,8 +144,25 @@ private class FlinkLogicalAggregateStreamConverter
}
object FlinkLogicalAggregate {
- val BATCH_CONVERTER: ConverterRule = new FlinkLogicalAggregateBatchConverter()
- val STREAM_CONVERTER: ConverterRule = new FlinkLogicalAggregateStreamConverter()
+
+ val BATCH_CONVERTER: ConverterRule = new FlinkLogicalAggregateBatchConverter(
+ ConverterRule.Config.INSTANCE
+ .withConversion(
+ classOf[LogicalAggregate],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalAggregateBatchConverter")
+ .withRuleFactory(
+ (config: ConverterRule.Config) => new FlinkLogicalAggregateBatchConverter(config)))
+ val STREAM_CONVERTER: ConverterRule = new FlinkLogicalAggregateStreamConverter(
+ ConverterRule.Config.INSTANCE
+ .withConversion(
+ classOf[LogicalAggregate],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalAggregateStreamConverter")
+ .withRuleFactory(
+ (config: ConverterRule.Config) => new FlinkLogicalAggregateStreamConverter(config)))
def create(
input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCalc.scala
index 95a4489cd69..eba39f46de6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCalc.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCalc.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
import org.apache.calcite.plan._
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.logical.LogicalCalc
import org.apache.calcite.rel.metadata.RelMdCollation
@@ -49,12 +50,7 @@ class FlinkLogicalCalc(
}
-private class FlinkLogicalCalcConverter
- extends ConverterRule(
- classOf[LogicalCalc],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalCalcConverter") {
+private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val calc = rel.asInstanceOf[LogicalCalc]
@@ -64,7 +60,12 @@ private class FlinkLogicalCalcConverter
}
object FlinkLogicalCalc {
- val CONVERTER: ConverterRule = new FlinkLogicalCalcConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalCalcConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalCalc],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalCalcConverter"))
def create(input: RelNode, calcProgram: RexProgram): FlinkLogicalCalc = {
val cluster = input.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCorrelate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCorrelate.scala
index 8378c9c70aa..aba0e4051e8 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCorrelate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalCorrelate.scala
@@ -61,12 +61,7 @@ class FlinkLogicalCorrelate(
}
-class FlinkLogicalCorrelateConverter
- extends ConverterRule(
- classOf[LogicalCorrelate],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalCorrelateConverter") {
+class FlinkLogicalCorrelateConverter(config: ConverterRule.Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val correlate = rel.asInstanceOf[LogicalCorrelate]
@@ -82,7 +77,15 @@ class FlinkLogicalCorrelateConverter
}
object FlinkLogicalCorrelate {
- val CONVERTER: ConverterRule = new FlinkLogicalCorrelateConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalCorrelateConverter(
+ ConverterRule.Config.INSTANCE
+ .withConversion(
+ classOf[LogicalCorrelate],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalCorrelateConverter")
+ .withRuleFactory(
+ (config: ConverterRule.Config) => new FlinkLogicalCorrelateConverter(config)))
def create(
left: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
index 4759ab5c8ab..76738aa2bc9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalTableScan
@@ -66,12 +67,7 @@ class FlinkLogicalDataStreamTableScan(
}
-class FlinkLogicalDataStreamTableScanConverter
- extends ConverterRule(
- classOf[LogicalTableScan],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalDataStreamTableScanConverter") {
+class FlinkLogicalDataStreamTableScanConverter(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0)
@@ -86,7 +82,12 @@ class FlinkLogicalDataStreamTableScanConverter
}
object FlinkLogicalDataStreamTableScan {
- val CONVERTER = new FlinkLogicalDataStreamTableScanConverter
+ val CONVERTER = new FlinkLogicalDataStreamTableScanConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalTableScan],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalDataStreamTableScanConverter"))
def isDataStreamTableScan(scan: TableScan): Boolean = {
val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDistribution.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDistribution.scala
index 7c2b632e410..a537ce32f64 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDistribution.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDistribution.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution
import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, SingleRel}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import java.util.{List => JList}
@@ -41,12 +42,7 @@ class FlinkLogicalDistribution(
new FlinkLogicalDistribution(getCluster, traitSet, inputs.get(0), collation, distKeys)
}
-class FlinkLogicalDistributionBatchConverter
- extends ConverterRule(
- classOf[LogicalDistribution],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalDistributionBatchConverter") {
+class FlinkLogicalDistributionBatchConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val distribution = rel.asInstanceOf[LogicalDistribution]
@@ -56,7 +52,12 @@ class FlinkLogicalDistributionBatchConverter
}
object FlinkLogicalDistribution {
- val BATCH_CONVERTER: RelOptRule = new FlinkLogicalDistributionBatchConverter
+ val BATCH_CONVERTER: RelOptRule = new FlinkLogicalDistributionBatchConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalDistribution],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalDistributionBatchConverter"))
def create(
input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalExpand.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalExpand.scala
index c9398d1f186..4cc0de30e7a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalExpand.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalExpand.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, LogicalExpand}
import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rex.RexNode
import java.util
@@ -46,12 +47,7 @@ class FlinkLogicalExpand(
}
-private class FlinkLogicalExpandConverter
- extends ConverterRule(
- classOf[LogicalExpand],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalExpandConverter") {
+private class FlinkLogicalExpandConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val expand = rel.asInstanceOf[LogicalExpand]
@@ -61,7 +57,12 @@ private class FlinkLogicalExpandConverter
}
object FlinkLogicalExpand {
- val CONVERTER: ConverterRule = new FlinkLogicalExpandConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalExpandConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalExpand],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalExpandConverter"))
def create(
input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntermediateTableScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntermediateTableScan.scala
index bf7594668ad..8ffd6c33577 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntermediateTableScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntermediateTableScan.scala
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.logical.LogicalTableScan
@@ -45,12 +46,7 @@ class FlinkLogicalIntermediateTableScan(
}
-class FlinkLogicalIntermediateTableScanConverter
- extends ConverterRule(
- classOf[LogicalTableScan],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalIntermediateTableScanConverter") {
+class FlinkLogicalIntermediateTableScanConverter(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0)
@@ -65,8 +61,12 @@ class FlinkLogicalIntermediateTableScanConverter
}
object FlinkLogicalIntermediateTableScan {
-
- val CONVERTER = new FlinkLogicalIntermediateTableScanConverter
+ val CONVERTER: ConverterRule = new FlinkLogicalIntermediateTableScanConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalTableScan],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalIntermediateTableScanConverter"))
def create(
cluster: RelOptCluster,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntersect.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntersect.scala
index 78468c8df35..ecb201258ca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntersect.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalIntersect.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.{Intersect, SetOp}
import org.apache.calcite.rel.logical.LogicalIntersect
import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -59,12 +60,7 @@ class FlinkLogicalIntersect(
}
-private class FlinkLogicalIntersectConverter
- extends ConverterRule(
- classOf[LogicalIntersect],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalIntersectConverter") {
+private class FlinkLogicalIntersectConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val intersect = rel.asInstanceOf[LogicalIntersect]
@@ -76,7 +72,12 @@ private class FlinkLogicalIntersectConverter
}
object FlinkLogicalIntersect {
- val CONVERTER: ConverterRule = new FlinkLogicalIntersectConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalIntersectConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalIntersect],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalIntersectConverter"))
def create(inputs: util.List[RelNode], all: Boolean): FlinkLogicalIntersect = {
val cluster = inputs.get(0).getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
index 8f1d42bd3c7..32dd6a7fbfd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.{CorrelationId, Join, JoinRelType}
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalJoin
@@ -78,12 +79,7 @@ class FlinkLogicalJoin(
}
/** Support all joins. */
-private class FlinkLogicalJoinConverter
- extends ConverterRule(
- classOf[LogicalJoin],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalJoinConverter") {
+private class FlinkLogicalJoinConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val join = rel.asInstanceOf[LogicalJoin]
@@ -94,7 +90,12 @@ private class FlinkLogicalJoinConverter
}
object FlinkLogicalJoin {
- val CONVERTER: ConverterRule = new FlinkLogicalJoinConverter
+ val CONVERTER: ConverterRule = new FlinkLogicalJoinConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalJoin],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalJoinConverter"))
def create(
left: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala
index f825e84e861..00c79162e15 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.sinks.TableSink
import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.hint.RelHint
import java.util
@@ -61,12 +62,7 @@ class FlinkLogicalLegacySink(
}
-private class FlinkLogicalLegacySinkConverter
- extends ConverterRule(
- classOf[LogicalLegacySink],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalLegacySinkConverter") {
+private class FlinkLogicalLegacySinkConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val sink = rel.asInstanceOf[LogicalLegacySink]
@@ -82,7 +78,12 @@ private class FlinkLogicalLegacySinkConverter
}
object FlinkLogicalLegacySink {
- val CONVERTER: ConverterRule = new FlinkLogicalLegacySinkConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalLegacySinkConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalLegacySink],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalLegacySinkConverter"))
def create(
input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala
index 5f6abfea564..328e89bc509 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalTableScan
@@ -83,12 +84,7 @@ class FlinkLogicalLegacyTableSourceScan(
}
-class FlinkLogicalLegacyTableSourceScanConverter
- extends ConverterRule(
- classOf[LogicalTableScan],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalLegacyTableSourceScanConverter") {
+class FlinkLogicalLegacyTableSourceScanConverter(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0)
@@ -103,7 +99,12 @@ class FlinkLogicalLegacyTableSourceScanConverter
}
object FlinkLogicalLegacyTableSourceScan {
- val CONVERTER = new FlinkLogicalLegacyTableSourceScanConverter
+ val CONVERTER = new FlinkLogicalLegacyTableSourceScanConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalTableScan],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalLegacyTableSourceScanConverter"))
def isTableSourceScan(scan: TableScan): Boolean = {
val tableSourceTable = scan.getTable.unwrap(classOf[LegacyTableSourceTable[_]])
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala
index d02f7cb3e0d..993abb5956f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMatch.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelCollation, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Match
import org.apache.calcite.rel.logical.LogicalMatch
import org.apache.calcite.rex.RexNode
@@ -109,12 +110,7 @@ class FlinkLogicalMatch(
}
}
-private class FlinkLogicalMatchConverter
- extends ConverterRule(
- classOf[LogicalMatch],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalMatchConverter") {
+private class FlinkLogicalMatchConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val logicalMatch = rel.asInstanceOf[LogicalMatch]
@@ -141,5 +137,10 @@ private class FlinkLogicalMatchConverter
}
object FlinkLogicalMatch {
- val CONVERTER: ConverterRule = new FlinkLogicalMatchConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalMatchConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalMatch],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalMatchConverter"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMinus.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMinus.scala
index 330d19f5c7a..1f89eeaf2d6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMinus.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMinus.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.{Minus, SetOp}
import org.apache.calcite.rel.logical.LogicalMinus
import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -59,12 +60,7 @@ class FlinkLogicalMinus(
}
-private class FlinkLogicalMinusConverter
- extends ConverterRule(
- classOf[LogicalMinus],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalMinusConverter") {
+private class FlinkLogicalMinusConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val minus = rel.asInstanceOf[LogicalMinus]
@@ -76,7 +72,12 @@ private class FlinkLogicalMinusConverter
}
object FlinkLogicalMinus {
- val CONVERTER: ConverterRule = new FlinkLogicalMinusConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalMinusConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalMinus],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalMinusConverter"))
def create(inputs: JList[RelNode], all: Boolean): FlinkLogicalMinus = {
val cluster = inputs.get(0).getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
index 2395ba1426d..0c9891f0f17 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Window
import org.apache.calcite.rel.logical.LogicalWindow
import org.apache.calcite.rel.metadata.RelMdCollation
@@ -62,12 +63,7 @@ class FlinkLogicalOverAggregate(
}
-class FlinkLogicalOverAggregateConverter
- extends ConverterRule(
- classOf[LogicalWindow],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalOverAggregateConverter") {
+class FlinkLogicalOverAggregateConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val window = rel.asInstanceOf[LogicalWindow]
@@ -109,5 +105,10 @@ class FlinkLogicalOverAggregateConverter
}
object FlinkLogicalOverAggregate {
- val CONVERTER = new FlinkLogicalOverAggregateConverter
+ val CONVERTER: ConverterRule = new FlinkLogicalOverAggregateConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalWindow],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalOverAggregateConverter"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalRank.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalRank.scala
index cde32de2c25..c7de2f3a24c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalRank.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalRank.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataTypeField
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.util.ImmutableBitSet
import java.util
@@ -83,12 +84,7 @@ class FlinkLogicalRank(
}
-private class FlinkLogicalRankConverter
- extends ConverterRule(
- classOf[LogicalRank],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalRankConverter") {
+private class FlinkLogicalRankConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val rank = rel.asInstanceOf[LogicalRank]
val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)
@@ -105,7 +101,12 @@ private class FlinkLogicalRankConverter
}
object FlinkLogicalRank {
- val CONVERTER: ConverterRule = new FlinkLogicalRankConverter
+ val CONVERTER: ConverterRule = new FlinkLogicalRankConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalRank],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalRankConverter"))
def create(
input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalScriptTransform.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalScriptTransform.scala
index 7a376564bd8..4d98e1086a9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalScriptTransform.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalScriptTransform.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitS
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, SingleRel}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import java.util.{List => JList}
@@ -53,12 +54,7 @@ class FlinkLogicalScriptTransform(
override protected def deriveRowType: RelDataType = outDataType
}
-class FlinkLogicalScriptTransformBatchConverter
- extends ConverterRule(
- classOf[LogicalScriptTransform],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalScriptTransformBatchConverter") {
+class FlinkLogicalScriptTransformBatchConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val scriptTransform = rel.asInstanceOf[LogicalScriptTransform]
@@ -74,7 +70,12 @@ class FlinkLogicalScriptTransformBatchConverter
}
object FlinkLogicalScriptTransform {
- val BATCH_CONVERTER: RelOptRule = new FlinkLogicalScriptTransformBatchConverter
+ val BATCH_CONVERTER: RelOptRule = new FlinkLogicalScriptTransformBatchConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalScriptTransform],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalScriptTransformBatchConverter"))
def create(
input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
index be1ff8a3c5e..d739ac6c664 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalSink, Sink}
import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.hint.RelHint
import java.util
@@ -62,12 +63,7 @@ class FlinkLogicalSink(
}
-private class FlinkLogicalSinkConverter
- extends ConverterRule(
- classOf[LogicalSink],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalSinkConverter") {
+private class FlinkLogicalSinkConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val sink = rel.asInstanceOf[LogicalSink]
@@ -83,7 +79,12 @@ private class FlinkLogicalSinkConverter
}
object FlinkLogicalSink {
- val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalSink],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalSinkConverter"))
def create(
input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSnapshot.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSnapshot.scala
index 5904dffa413..bea50d3755b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSnapshot.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSnapshot.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.calcite.plan._
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Snapshot
import org.apache.calcite.rel.logical.LogicalSnapshot
import org.apache.calcite.rel.metadata.{RelMdCollation, RelMetadataQuery}
@@ -79,12 +80,7 @@ class FlinkLogicalSnapshot(
}
-class FlinkLogicalSnapshotConverter
- extends ConverterRule(
- classOf[LogicalSnapshot],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalSnapshotConverter") {
+class FlinkLogicalSnapshotConverter(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val snapshot = rel.asInstanceOf[LogicalSnapshot]
@@ -95,7 +91,12 @@ class FlinkLogicalSnapshotConverter
object FlinkLogicalSnapshot {
- val CONVERTER = new FlinkLogicalSnapshotConverter
+ val CONVERTER: ConverterRule = new FlinkLogicalSnapshotConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalSnapshot],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalSnapshotConverter"))
def create(input: RelNode, period: RexNode): FlinkLogicalSnapshot = {
val cluster = input.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSort.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSort.scala
index df51b5b11ea..f0b8ed02bc5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSort.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSort.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.calcite.plan._
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Sort
import org.apache.calcite.rel.logical.LogicalSort
import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -80,12 +81,7 @@ class FlinkLogicalSort(
}
-class FlinkLogicalSortStreamConverter
- extends ConverterRule(
- classOf[LogicalSort],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalSortStreamConverter") {
+class FlinkLogicalSortStreamConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val sort = rel.asInstanceOf[LogicalSort]
@@ -94,12 +90,7 @@ class FlinkLogicalSortStreamConverter
}
}
-class FlinkLogicalSortBatchConverter
- extends ConverterRule(
- classOf[LogicalSort],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalSortBatchConverter") {
+class FlinkLogicalSortBatchConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val sort = rel.asInstanceOf[LogicalSort]
@@ -126,8 +117,18 @@ class FlinkLogicalSortBatchConverter
}
object FlinkLogicalSort {
- val BATCH_CONVERTER: RelOptRule = new FlinkLogicalSortBatchConverter
- val STREAM_CONVERTER: RelOptRule = new FlinkLogicalSortStreamConverter
+ val BATCH_CONVERTER: RelOptRule = new FlinkLogicalSortBatchConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalSort],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalSortBatchConverter"))
+ val STREAM_CONVERTER: RelOptRule = new FlinkLogicalSortStreamConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalSort],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalSortStreamConverter"))
def create(
input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableAggregate.scala
index f9d21a637c9..c0074cc9e0c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableAggregate.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate,
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.util.ImmutableBitSet
@@ -55,12 +56,7 @@ class FlinkLogicalTableAggregate(
}
}
-private class FlinkLogicalTableAggregateConverter
- extends ConverterRule(
- classOf[LogicalTableAggregate],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalTableAggregateConverter") {
+private class FlinkLogicalTableAggregateConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val agg = rel.asInstanceOf[LogicalTableAggregate]
@@ -78,5 +74,10 @@ private class FlinkLogicalTableAggregateConverter
}
object FlinkLogicalTableAggregate {
- val CONVERTER: ConverterRule = new FlinkLogicalTableAggregateConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalTableAggregateConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalTableAggregate],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalTableAggregateConverter"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index ff067e301e1..c402ab6c770 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -22,10 +22,11 @@ import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
import org.apache.flink.table.planner.functions.utils.TableSqlFunction
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.TableFunctionScan
import org.apache.calcite.rel.logical.LogicalTableFunctionScan
import org.apache.calcite.rel.metadata.RelColumnMapping
@@ -78,12 +79,7 @@ class FlinkLogicalTableFunctionScan(
}
-class FlinkLogicalTableFunctionScanConverter
- extends ConverterRule(
- classOf[LogicalTableFunctionScan],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalTableFunctionScanConverter") {
+class FlinkLogicalTableFunctionScanConverter(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val logicalTableFunction: LogicalTableFunctionScan = call.rel(0)
@@ -125,5 +121,10 @@ class FlinkLogicalTableFunctionScanConverter
}
object FlinkLogicalTableFunctionScan {
- val CONVERTER = new FlinkLogicalTableFunctionScanConverter
+ val CONVERTER = new FlinkLogicalTableFunctionScanConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalTableFunctionScan],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalTableFunctionScanConverter"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index ba4f16dee43..fb176c99eca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalTableScan
@@ -107,12 +108,7 @@ class FlinkLogicalTableSourceScan(
}
-class FlinkLogicalTableSourceScanConverter
- extends ConverterRule(
- classOf[LogicalTableScan],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalTableSourceScanConverter") {
+class FlinkLogicalTableSourceScanConverter(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0)
@@ -127,7 +123,12 @@ class FlinkLogicalTableSourceScanConverter
}
object FlinkLogicalTableSourceScan {
- val CONVERTER = new FlinkLogicalTableSourceScanConverter
+ val CONVERTER = new FlinkLogicalTableSourceScanConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalTableScan],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalTableSourceScanConverter"))
def isTableSourceScan(scan: TableScan): Boolean = {
val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalUnion.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalUnion.scala
index c7805e6bf2b..1bf479a7f65 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalUnion.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalUnion.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.{SetOp, Union}
import org.apache.calcite.rel.logical.LogicalUnion
import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -57,12 +58,7 @@ class FlinkLogicalUnion(
}
-private class FlinkLogicalUnionConverter
- extends ConverterRule(
- classOf[LogicalUnion],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalUnionConverter") {
+private class FlinkLogicalUnionConverter(config: Config) extends ConverterRule(config) {
/** Only translate UNION ALL. */
override def matches(call: RelOptRuleCall): Boolean = {
@@ -80,7 +76,12 @@ private class FlinkLogicalUnionConverter
}
object FlinkLogicalUnion {
- val CONVERTER: ConverterRule = new FlinkLogicalUnionConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalUnionConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalUnion],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalUnionConverter"))
def create(inputs: JList[RelNode], all: Boolean): FlinkLogicalUnion = {
val cluster = inputs.get(0).getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalValues.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalValues.scala
index ddd04d77b4c..b7539dc0b84 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalValues.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalValues.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rel.logical.LogicalValues
import org.apache.calcite.rel.metadata.{RelMdCollation, RelMetadataQuery}
@@ -58,12 +59,7 @@ class FlinkLogicalValues(
}
-private class FlinkLogicalValuesConverter
- extends ConverterRule(
- classOf[LogicalValues],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalValuesConverter") {
+private class FlinkLogicalValuesConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val values = rel.asInstanceOf[LogicalValues]
@@ -76,7 +72,12 @@ private class FlinkLogicalValuesConverter
}
object FlinkLogicalValues {
- val CONVERTER: ConverterRule = new FlinkLogicalValuesConverter()
+ val CONVERTER: ConverterRule = new FlinkLogicalValuesConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalValues],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalValuesConverter"))
def create(
cluster: RelOptCluster,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
index 9885baabb38..795304a9f38 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalWatermarkAssign
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rex.RexNode
/**
@@ -49,12 +50,7 @@ class FlinkLogicalWatermarkAssigner(
}
-class FlinkLogicalWatermarkAssignerConverter
- extends ConverterRule(
- classOf[LogicalWatermarkAssigner],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalWatermarkAssignerConverter") {
+class FlinkLogicalWatermarkAssignerConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val watermark = rel.asInstanceOf[LogicalWatermarkAssigner]
@@ -67,7 +63,12 @@ class FlinkLogicalWatermarkAssignerConverter
}
object FlinkLogicalWatermarkAssigner {
- val CONVERTER = new FlinkLogicalWatermarkAssignerConverter
+ val CONVERTER = new FlinkLogicalWatermarkAssignerConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalWatermarkAssigner],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalWatermarkAssignerConverter"))
def create(
input: RelNode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index b45b4e86cad..c483d660f0a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
import org.apache.calcite.rel.core.Aggregate.Group
import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -74,12 +75,7 @@ class FlinkLogicalWindowAggregate(
}
-class FlinkLogicalWindowAggregateConverter
- extends ConverterRule(
- classOf[LogicalWindowAggregate],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalWindowAggregateConverter") {
+class FlinkLogicalWindowAggregateConverter(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
@@ -114,5 +110,10 @@ class FlinkLogicalWindowAggregateConverter
}
object FlinkLogicalWindowAggregate {
- val CONVERTER = new FlinkLogicalWindowAggregateConverter
+ val CONVERTER: ConverterRule = new FlinkLogicalWindowAggregateConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalWindowAggregate],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalWindowAggregateConverter"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
index 3d9c1b2d4db..a57e65bd8c6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.util.ImmutableBitSet
@@ -78,12 +79,7 @@ class FlinkLogicalWindowTableAggregate(
}
}
-class FlinkLogicalWindowTableAggregateConverter
- extends ConverterRule(
- classOf[LogicalWindowTableAggregate],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalWindowTableAggregateConverter") {
+class FlinkLogicalWindowTableAggregateConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val agg = rel.asInstanceOf[LogicalWindowTableAggregate]
@@ -103,5 +99,10 @@ class FlinkLogicalWindowTableAggregateConverter
}
object FlinkLogicalWindowTableAggregate {
- val CONVERTER = new FlinkLogicalWindowTableAggregateConverter
+ val CONVERTER = new FlinkLogicalWindowTableAggregateConverter(
+ Config.INSTANCE.withConversion(
+ classOf[LogicalWindowTableAggregate],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalWindowTableAggregateConverter"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala
index c54a340deb9..4ce90a215f9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala
@@ -25,14 +25,10 @@ import org.apache.flink.table.planner.plan.schema.DataStreamTable
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule that converts [[FlinkLogicalDataStreamTableScan]] to [[BatchPhysicalBoundedStreamScan]]. */
-class BatchPhysicalBoundedStreamScanRule
- extends ConverterRule(
- classOf[FlinkLogicalDataStreamTableScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalBoundedStreamScanRule") {
+class BatchPhysicalBoundedStreamScanRule(config: Config) extends ConverterRule(config) {
/** If the input is not a DataStreamTable, we want the TableScanRule to match instead */
override def matches(call: RelOptRuleCall): Boolean = {
@@ -54,5 +50,10 @@ class BatchPhysicalBoundedStreamScanRule
}
object BatchPhysicalBoundedStreamScanRule {
- val INSTANCE: RelOptRule = new BatchPhysicalBoundedStreamScanRule
+ val INSTANCE: RelOptRule = new BatchPhysicalBoundedStreamScanRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalDataStreamTableScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalBoundedStreamScanRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCalcRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCalcRule.scala
index 3751cf8cb09..5c6ed2f5603 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCalcRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCalcRule.scala
@@ -25,16 +25,12 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConverters._
/** Rule that converts [[FlinkLogicalCalc]] to [[BatchPhysicalCalc]]. */
-class BatchPhysicalCalcRule
- extends ConverterRule(
- classOf[FlinkLogicalCalc],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalCalcRule") {
+class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val calc: FlinkLogicalCalc = call.rel(0)
@@ -52,5 +48,10 @@ class BatchPhysicalCalcRule
}
object BatchPhysicalCalcRule {
- val INSTANCE: RelOptRule = new BatchPhysicalCalcRule
+ val INSTANCE: RelOptRule = new BatchPhysicalCalcRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalCalc],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalCalcRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
index 638a5785d6c..9f714f2057c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
@@ -26,14 +26,10 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rex.RexNode
-class BatchPhysicalCorrelateRule
- extends ConverterRule(
- classOf[FlinkLogicalCorrelate],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalCorrelateRule") {
+class BatchPhysicalCorrelateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val join = call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
@@ -86,5 +82,10 @@ class BatchPhysicalCorrelateRule
}
object BatchPhysicalCorrelateRule {
- val INSTANCE: RelOptRule = new BatchPhysicalCorrelateRule
+ val INSTANCE: RelOptRule = new BatchPhysicalCorrelateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalCorrelate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalCorrelateRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalDistributionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalDistributionRule.scala
index 8782b7bf306..7ce64932ef0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalDistributionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalDistributionRule.scala
@@ -19,21 +19,16 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalDistribution
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule that matches [[FlinkLogicalDistribution]]. */
-class BatchPhysicalDistributionRule
- extends ConverterRule(
- classOf[FlinkLogicalDistribution],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchExecDistributionRule") {
+class BatchPhysicalDistributionRule(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val logicalDistribution = rel.asInstanceOf[FlinkLogicalDistribution]
@@ -61,5 +56,10 @@ class BatchPhysicalDistributionRule
}
object BatchPhysicalDistributionRule {
- val INSTANCE: RelOptRule = new BatchPhysicalDistributionRule
+ val INSTANCE: RelOptRule = new BatchPhysicalDistributionRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalDistribution],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchExecDistributionRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalExpandRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalExpandRule.scala
index d131dd95bde..42a7b05c844 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalExpandRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalExpandRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExp
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule that converts [[FlinkLogicalExpand]] to [[BatchPhysicalExpand]]. */
-class BatchPhysicalExpandRule
- extends ConverterRule(
- classOf[FlinkLogicalExpand],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalExpandRule") {
+class BatchPhysicalExpandRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val expand = rel.asInstanceOf[FlinkLogicalExpand]
@@ -47,5 +43,10 @@ class BatchPhysicalExpandRule
}
object BatchPhysicalExpandRule {
- val INSTANCE: RelOptRule = new BatchPhysicalExpandRule
+ val INSTANCE: RelOptRule = new BatchPhysicalExpandRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalExpand],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalExpandRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalIntermediateTableScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalIntermediateTableScanRule.scala
index 1f84547d4fd..293283085bd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalIntermediateTableScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalIntermediateTableScanRule.scala
@@ -24,17 +24,13 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalInt
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/**
* Rule that converts [[FlinkLogicalIntermediateTableScan]] to
* [[BatchPhysicalIntermediateTableScan]].
*/
-class BatchPhysicalIntermediateTableScanRule
- extends ConverterRule(
- classOf[FlinkLogicalIntermediateTableScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalIntermediateTableScanRule") {
+class BatchPhysicalIntermediateTableScanRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[FlinkLogicalIntermediateTableScan]
@@ -44,5 +40,10 @@ class BatchPhysicalIntermediateTableScanRule
}
object BatchPhysicalIntermediateTableScanRule {
- val INSTANCE: RelOptRule = new BatchPhysicalIntermediateTableScanRule
+ val INSTANCE: RelOptRule = new BatchPhysicalIntermediateTableScanRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalIntermediateTableScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalIntermediateTableScanRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
index 19e1e951e69..b5d9286c036 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
@@ -28,15 +28,11 @@ import org.apache.flink.table.sinks.PartitionableTableSink
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.{RelCollations, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConversions._
-class BatchPhysicalLegacySinkRule
- extends ConverterRule(
- classOf[FlinkLogicalLegacySink],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalLegacySinkRule") {
+class BatchPhysicalLegacySinkRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val sink = rel.asInstanceOf[FlinkLogicalLegacySink]
@@ -92,5 +88,10 @@ class BatchPhysicalLegacySinkRule
}
object BatchPhysicalLegacySinkRule {
- val INSTANCE: RelOptRule = new BatchPhysicalLegacySinkRule
+ val INSTANCE: RelOptRule = new BatchPhysicalLegacySinkRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalLegacySink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalLegacySinkRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala
index 5f55d0b83cf..0ca49eb479c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.sources.StreamTableSource
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.TableScan
/**
* Rule that converts [[FlinkLogicalLegacyTableSourceScan]] to
* [[BatchPhysicalLegacyTableSourceScan]].
*/
-class BatchPhysicalLegacyTableSourceScanRule
- extends ConverterRule(
- classOf[FlinkLogicalLegacyTableSourceScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalLegacyTableSourceScan") {
+class BatchPhysicalLegacyTableSourceScanRule(config: Config) extends ConverterRule(config) {
/** Rule must only match if TableScan targets a bounded [[StreamTableSource]] */
override def matches(call: RelOptRuleCall): Boolean = {
@@ -66,5 +62,10 @@ class BatchPhysicalLegacyTableSourceScanRule
}
object BatchPhysicalLegacyTableSourceScanRule {
- val INSTANCE: RelOptRule = new BatchPhysicalLegacyTableSourceScanRule
+ val INSTANCE: RelOptRule = new BatchPhysicalLegacyTableSourceScanRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalLegacyTableSourceScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalLegacyTableSourceScan"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLimitRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLimitRule.scala
index 8dd0d00973a..452bdfb7bd4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLimitRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLimitRule.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.plan.utils.SortUtil
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rex.RexLiteral
import org.apache.calcite.sql.`type`.SqlTypeName
@@ -45,12 +46,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
* }}}
* when fetch is null.
*/
-class BatchPhysicalLimitRule
- extends ConverterRule(
- classOf[FlinkLogicalSort],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalLimitRule") {
+class BatchPhysicalLimitRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
@@ -102,5 +98,10 @@ class BatchPhysicalLimitRule
}
object BatchPhysicalLimitRule {
- val INSTANCE: RelOptRule = new BatchPhysicalLimitRule
+ val INSTANCE: RelOptRule = new BatchPhysicalLimitRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalSort],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalLimitRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCalcRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCalcRule.scala
index 6217353b96b..abad1f05336 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCalcRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCalcRule.scala
@@ -25,16 +25,12 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConverters._
/** Rule that converts [[FlinkLogicalCalc]] to [[BatchPhysicalPythonCalc]]. */
-class BatchPhysicalPythonCalcRule
- extends ConverterRule(
- classOf[FlinkLogicalCalc],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalPythonCalcRule") {
+class BatchPhysicalPythonCalcRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val calc: FlinkLogicalCalc = call.rel(0)
@@ -52,5 +48,10 @@ class BatchPhysicalPythonCalcRule
}
object BatchPhysicalPythonCalcRule {
- val INSTANCE: RelOptRule = new BatchPhysicalPythonCalcRule
+ val INSTANCE: RelOptRule = new BatchPhysicalPythonCalcRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalCalc],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalPythonCalcRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
index 35c36371297..57a557ea015 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankTyp
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.{RelCollations, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConversions._
@@ -41,12 +42,7 @@ import scala.collection.JavaConversions._
* +- input of rank
* }}}
*/
-class BatchPhysicalRankRule
- extends ConverterRule(
- classOf[FlinkLogicalRank],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalRankRule") {
+class BatchPhysicalRankRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
@@ -115,5 +111,10 @@ class BatchPhysicalRankRule
}
object BatchPhysicalRankRule {
- val INSTANCE: RelOptRule = new BatchPhysicalRankRule
+ val INSTANCE: RelOptRule = new BatchPhysicalRankRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalRank],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalRankRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalScriptTransformRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalScriptTransformRule.scala
index 83d25367101..f7fb384887a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalScriptTransformRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalScriptTransformRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalScr
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule that match [[FlinkLogicalScriptTransform]] */
-class BatchPhysicalScriptTransformRule
- extends ConverterRule(
- classOf[FlinkLogicalScriptTransform],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchExecScriptTrans") {
+class BatchPhysicalScriptTransformRule(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val logicalScriptTransform = rel.asInstanceOf[FlinkLogicalScriptTransform]
@@ -53,5 +49,10 @@ class BatchPhysicalScriptTransformRule
}
object BatchPhysicalScriptTransformRule {
- val INSTANCE: RelOptRule = new BatchPhysicalScriptTransformRule
+ val INSTANCE: RelOptRule = new BatchPhysicalScriptTransformRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalScriptTransform],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchExecScriptTrans"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSingleRowJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSingleRowJoinRule.scala
index 904802f2034..156a2478725 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSingleRowJoinRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSingleRowJoinRule.scala
@@ -26,18 +26,15 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core._
/**
* Rule that converts [[FlinkLogicalJoin]] to [[BatchPhysicalNestedLoopJoin]] if one of join input
* sides returns at most a single row.
*/
-class BatchPhysicalSingleRowJoinRule
- extends ConverterRule(
- classOf[FlinkLogicalJoin],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalSingleRowJoinRule")
+class BatchPhysicalSingleRowJoinRule(config: Config)
+ extends ConverterRule(config)
with BatchPhysicalJoinRuleBase
with BatchPhysicalNestedLoopJoinRuleBase {
@@ -85,5 +82,10 @@ class BatchPhysicalSingleRowJoinRule
}
object BatchPhysicalSingleRowJoinRule {
- val INSTANCE: RelOptRule = new BatchPhysicalSingleRowJoinRule
+ val INSTANCE: RelOptRule = new BatchPhysicalSingleRowJoinRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalJoin],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalSingleRowJoinRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
index 2317759fd54..1d055774c94 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
@@ -31,16 +31,12 @@ import org.apache.flink.table.types.logical.RowType
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.{RelCollations, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConversions._
import scala.collection.mutable
-class BatchPhysicalSinkRule
- extends ConverterRule(
- classOf[FlinkLogicalSink],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalSinkRule") {
+class BatchPhysicalSinkRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val sink = rel.asInstanceOf[FlinkLogicalSink]
@@ -118,5 +114,10 @@ class BatchPhysicalSinkRule
}
object BatchPhysicalSinkRule {
- val INSTANCE = new BatchPhysicalSinkRule
+ val INSTANCE = new BatchPhysicalSinkRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalSink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalSinkRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
index c0fb28bddc6..a360ce43863 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.plan.utils.SortUtil
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.sql.`type`.SqlTypeName
/**
@@ -45,12 +46,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
* }}}
* when fetch is null
*/
-class BatchPhysicalSortLimitRule
- extends ConverterRule(
- classOf[FlinkLogicalSort],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalSortLimitRule") {
+class BatchPhysicalSortLimitRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
@@ -105,5 +101,10 @@ class BatchPhysicalSortLimitRule
}
object BatchPhysicalSortLimitRule {
- val INSTANCE: RelOptRule = new BatchPhysicalSortLimitRule
+ val INSTANCE: RelOptRule = new BatchPhysicalSortLimitRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalSort],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalSortLimitRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
index ef387d485d1..06c64365f31 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import java.lang.{Boolean => JBoolean}
@@ -36,12 +37,7 @@ import java.lang.{Boolean => JBoolean}
* Rule that matches [[FlinkLogicalSort]] which sort fields is non-empty and both `fetch` and
* `offset` are null, and converts it to [[BatchPhysicalSort]].
*/
-class BatchPhysicalSortRule
- extends ConverterRule(
- classOf[FlinkLogicalSort],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalSortRule") {
+class BatchPhysicalSortRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
@@ -72,7 +68,12 @@ class BatchPhysicalSortRule
}
object BatchPhysicalSortRule {
- val INSTANCE: RelOptRule = new BatchPhysicalSortRule
+ val INSTANCE: RelOptRule = new BatchPhysicalSortRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalSort],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalSortRule"))
// It is a experimental config, will may be removed later.
@Experimental
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala
index 0bf662cfe70..424f56d9d1b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala
@@ -27,15 +27,11 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex
import org.apache.calcite.plan.RelOptRuleCall
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.TableScan
/** Rule that converts [[FlinkLogicalTableSourceScan]] to [[BatchPhysicalTableSourceScan]]. */
-class BatchPhysicalTableSourceScanRule
- extends ConverterRule(
- classOf[FlinkLogicalTableSourceScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalTableSourceScanRule") {
+class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {
/** Rule must only match if TableScan targets a bounded [[ScanTableSource]] */
override def matches(call: RelOptRuleCall): Boolean = {
@@ -65,5 +61,10 @@ class BatchPhysicalTableSourceScanRule
}
object BatchPhysicalTableSourceScanRule {
- val INSTANCE = new BatchPhysicalTableSourceScanRule
+ val INSTANCE: ConverterRule = new BatchPhysicalTableSourceScanRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalTableSourceScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalTableSourceScanRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalUnionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalUnionRule.scala
index 9bd8c52b021..df193cc41b4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalUnionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalUnionRule.scala
@@ -24,16 +24,12 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUni
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConversions._
/** Rule that converts [[FlinkLogicalUnion]] to [[BatchPhysicalUnion]]. */
-class BatchPhysicalUnionRule
- extends ConverterRule(
- classOf[FlinkLogicalUnion],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalUnionRule") {
+class BatchPhysicalUnionRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
call.rel(0).asInstanceOf[FlinkLogicalUnion].all
@@ -49,5 +45,10 @@ class BatchPhysicalUnionRule
}
object BatchPhysicalUnionRule {
- val INSTANCE: RelOptRule = new BatchPhysicalUnionRule
+ val INSTANCE: RelOptRule = new BatchPhysicalUnionRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalUnion],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalUnionRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala
index c762791e52d..5d6e78e7475 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalVal
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule that converts [[FlinkLogicalValues]] to [[BatchPhysicalValues]]. */
-class BatchPhysicalValuesRule
- extends ConverterRule(
- classOf[FlinkLogicalValues],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalValuesRule") {
+class BatchPhysicalValuesRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues]
@@ -42,5 +38,10 @@ class BatchPhysicalValuesRule
}
object BatchPhysicalValuesRule {
- val INSTANCE: RelOptRule = new BatchPhysicalValuesRule
+ val INSTANCE: RelOptRule = new BatchPhysicalValuesRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalValues],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalValuesRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowTableFunctionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowTableFunctionRule.scala
index d38365b6502..c8d90a8cca2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowTableFunctionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowTableFunctionRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.planner.plan.utils.WindowUtil.convertToWindowingSt
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rex.RexCall
/**
* Rule to convert a [[FlinkLogicalTableFunctionScan]] with window table function call into a
* [[BatchPhysicalWindowTableFunction]].
*/
-class BatchPhysicalWindowTableFunctionRule
- extends ConverterRule(
- classOf[FlinkLogicalTableFunctionScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.BATCH_PHYSICAL,
- "BatchPhysicalWindowTableFunctionRule") {
+class BatchPhysicalWindowTableFunctionRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: FlinkLogicalTableFunctionScan = call.rel(0)
@@ -60,5 +56,10 @@ class BatchPhysicalWindowTableFunctionRule
}
object BatchPhysicalWindowTableFunctionRule {
- val INSTANCE: RelOptRule = new BatchPhysicalWindowTableFunctionRule
+ val INSTANCE: RelOptRule = new BatchPhysicalWindowTableFunctionRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalTableFunctionScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchPhysicalWindowTableFunctionRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCalcRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCalcRule.scala
index 0bb59ff21ec..1571b23d24b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCalcRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCalcRule.scala
@@ -25,16 +25,12 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConverters._
/** Rule that converts [[FlinkLogicalCalc]] to [[StreamPhysicalCalc]]. */
-class StreamPhysicalCalcRule
- extends ConverterRule(
- classOf[FlinkLogicalCalc],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalCalcRule") {
+class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val calc: FlinkLogicalCalc = call.rel(0)
@@ -52,5 +48,10 @@ class StreamPhysicalCalcRule
}
object StreamPhysicalCalcRule {
- val INSTANCE: RelOptRule = new StreamPhysicalCalcRule
+ val INSTANCE: RelOptRule = new StreamPhysicalCalcRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalCalc],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalCalcRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
index 68593c09383..83cf99c3177 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
@@ -29,15 +29,11 @@ import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rex.{RexNode, RexProgram, RexProgramBuilder}
/** Rule that converts [[FlinkLogicalCorrelate]] to [[StreamPhysicalCorrelate]]. */
-class StreamPhysicalCorrelateRule
- extends ConverterRule(
- classOf[FlinkLogicalCorrelate],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalCorrelateRule") {
+class StreamPhysicalCorrelateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val correlate: FlinkLogicalCorrelate = call.rel(0)
@@ -102,7 +98,12 @@ class StreamPhysicalCorrelateRule
}
object StreamPhysicalCorrelateRule {
- val INSTANCE: RelOptRule = new StreamPhysicalCorrelateRule
+ val INSTANCE: RelOptRule = new StreamPhysicalCorrelateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalCorrelate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalCorrelateRule"))
def getMergedCalc(calc: FlinkLogicalCalc): FlinkLogicalCalc = {
val child = calc.getInput match {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala
index 2774df0c194..433fb3af17c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala
@@ -25,14 +25,10 @@ import org.apache.flink.table.planner.plan.schema.DataStreamTable
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule that converts [[FlinkLogicalDataStreamTableScan]] to [[StreamPhysicalDataStreamScan]]. */
-class StreamPhysicalDataStreamScanRule
- extends ConverterRule(
- classOf[FlinkLogicalDataStreamTableScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalDataStreamScanRule") {
+class StreamPhysicalDataStreamScanRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: FlinkLogicalDataStreamTableScan = call.rel(0)
@@ -55,5 +51,10 @@ class StreamPhysicalDataStreamScanRule
}
object StreamPhysicalDataStreamScanRule {
- val INSTANCE: RelOptRule = new StreamPhysicalDataStreamScanRule
+ val INSTANCE: RelOptRule = new StreamPhysicalDataStreamScanRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalDataStreamTableScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalDataStreamScanRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala
index 03f5420536a..e232a4646be 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.planner.plan.utils.RankUtil
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/**
* Rule that matches [[FlinkLogicalRank]] which is sorted by time attribute and limits 1 and its
@@ -45,12 +46,7 @@ import org.apache.calcite.rel.convert.ConverterRule
* rowtime DESC) as row_num FROM MyTable ) WHERE row_num <= 1 }}} will be converted to
* StreamExecDeduplicate which keeps last row in rowtime.
*/
-class StreamPhysicalDeduplicateRule
- extends ConverterRule(
- classOf[FlinkLogicalRank],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalDeduplicateRule") {
+class StreamPhysicalDeduplicateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
@@ -94,5 +90,10 @@ class StreamPhysicalDeduplicateRule
}
object StreamPhysicalDeduplicateRule {
- val INSTANCE = new StreamPhysicalDeduplicateRule
+ val INSTANCE = new StreamPhysicalDeduplicateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalRank],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalDeduplicateRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalExpandRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalExpandRule.scala
index fb61c316694..850ea926623 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalExpandRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalExpandRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalE
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule that converts [[FlinkLogicalExpand]] to [[StreamPhysicalExpand]]. */
-class StreamPhysicalExpandRule
- extends ConverterRule(
- classOf[FlinkLogicalExpand],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalExpandRule") {
+class StreamPhysicalExpandRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val expand = rel.asInstanceOf[FlinkLogicalExpand]
@@ -47,5 +43,10 @@ class StreamPhysicalExpandRule
}
object StreamPhysicalExpandRule {
- val INSTANCE: RelOptRule = new StreamPhysicalExpandRule
+ val INSTANCE: RelOptRule = new StreamPhysicalExpandRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalExpand],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalExpandRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
index 371788c5c07..a7f71c2e515 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
@@ -29,17 +29,13 @@ import org.apache.flink.table.planner.plan.utils.WindowUtil
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Aggregate.Group
import scala.collection.JavaConversions._
/** Rule to convert a [[FlinkLogicalAggregate]] into a [[StreamPhysicalGroupAggregate]]. */
-class StreamPhysicalGroupAggregateRule
- extends ConverterRule(
- classOf[FlinkLogicalAggregate],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalGroupAggregateRule") {
+class StreamPhysicalGroupAggregateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: FlinkLogicalAggregate = call.rel(0)
@@ -86,5 +82,10 @@ class StreamPhysicalGroupAggregateRule
}
object StreamPhysicalGroupAggregateRule {
- val INSTANCE: RelOptRule = new StreamPhysicalGroupAggregateRule
+ val INSTANCE: RelOptRule = new StreamPhysicalGroupAggregateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalAggregate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalGroupAggregateRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupTableAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupTableAggregateRule.scala
index 6b07092398e..01eb41a1c12 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupTableAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupTableAggregateRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConversions._
/**
* Rule to convert a [[FlinkLogicalTableAggregate]] into a [[StreamPhysicalGroupTableAggregate]].
*/
-class StreamPhysicalGroupTableAggregateRule
- extends ConverterRule(
- classOf[FlinkLogicalTableAggregate],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalGroupTableAggregateRule") {
+class StreamPhysicalGroupTableAggregateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: FlinkLogicalTableAggregate = call.rel(0)
@@ -70,5 +66,10 @@ class StreamPhysicalGroupTableAggregateRule
}
object StreamPhysicalGroupTableAggregateRule {
- val INSTANCE: StreamPhysicalGroupTableAggregateRule = new StreamPhysicalGroupTableAggregateRule()
+ val INSTANCE: StreamPhysicalGroupTableAggregateRule = new StreamPhysicalGroupTableAggregateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalTableAggregate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalGroupTableAggregateRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
index 2fe94c717af..9bfaebdd07c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.api.TableException
-import org.apache.flink.table.planner.calcite.FlinkContext
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate
@@ -30,6 +29,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Aggregate.Group
import scala.collection.JavaConversions._
@@ -37,12 +37,7 @@ import scala.collection.JavaConversions._
/**
* Rule to convert a [[FlinkLogicalWindowAggregate]] into a [[StreamPhysicalGroupWindowAggregate]].
*/
-class StreamPhysicalGroupWindowAggregateRule
- extends ConverterRule(
- classOf[FlinkLogicalWindowAggregate],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalGroupWindowAggregateRule") {
+class StreamPhysicalGroupWindowAggregateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: FlinkLogicalWindowAggregate = call.rel(0)
@@ -88,5 +83,10 @@ class StreamPhysicalGroupWindowAggregateRule
}
object StreamPhysicalGroupWindowAggregateRule {
- val INSTANCE: RelOptRule = new StreamPhysicalGroupWindowAggregateRule
+ val INSTANCE: RelOptRule = new StreamPhysicalGroupWindowAggregateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalWindowAggregate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalGroupWindowAggregateRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowTableAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowTableAggregateRule.scala
index 7379ef41dc3..e5f6dd35d83 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowTableAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowTableAggregateRule.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConversions._
@@ -35,12 +36,7 @@ import scala.collection.JavaConversions._
* Rule to convert a [[FlinkLogicalWindowTableAggregate]] into a
* [[StreamPhysicalGroupWindowTableAggregate]].
*/
-class StreamPhysicalGroupWindowTableAggregateRule
- extends ConverterRule(
- classOf[FlinkLogicalWindowTableAggregate],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalGroupWindowTableAggregateRule") {
+class StreamPhysicalGroupWindowTableAggregateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: FlinkLogicalWindowTableAggregate = call.rel(0)
@@ -86,5 +82,10 @@ class StreamPhysicalGroupWindowTableAggregateRule
}
object StreamPhysicalGroupWindowTableAggregateRule {
- val INSTANCE: RelOptRule = new StreamPhysicalGroupWindowTableAggregateRule
+ val INSTANCE: RelOptRule = new StreamPhysicalGroupWindowTableAggregateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalWindowTableAggregate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalGroupWindowTableAggregateRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntermediateTableScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntermediateTableScanRule.scala
index 31119899586..30752544b14 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntermediateTableScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntermediateTableScanRule.scala
@@ -24,17 +24,13 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalI
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/**
* Rule that converts [[FlinkLogicalIntermediateTableScan]] to
* [[StreamPhysicalIntermediateTableScan]].
*/
-class StreamPhysicalIntermediateTableScanRule
- extends ConverterRule(
- classOf[FlinkLogicalIntermediateTableScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalIntermediateTableScanRule") {
+class StreamPhysicalIntermediateTableScanRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[FlinkLogicalIntermediateTableScan]
@@ -44,5 +40,10 @@ class StreamPhysicalIntermediateTableScanRule
}
object StreamPhysicalIntermediateTableScanRule {
- val INSTANCE: RelOptRule = new StreamPhysicalIntermediateTableScanRule
+ val INSTANCE: RelOptRule = new StreamPhysicalIntermediateTableScanRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalIntermediateTableScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalIntermediateTableScanRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
index 9170336a2d2..a710bd3f008 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
@@ -27,15 +27,11 @@ import org.apache.flink.table.sinks.PartitionableTableSink
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConversions._
-class StreamPhysicalLegacySinkRule
- extends ConverterRule(
- classOf[FlinkLogicalLegacySink],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalLegacySinkRule") {
+class StreamPhysicalLegacySinkRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val sink = rel.asInstanceOf[FlinkLogicalLegacySink]
@@ -89,5 +85,10 @@ class StreamPhysicalLegacySinkRule
}
object StreamPhysicalLegacySinkRule {
- val INSTANCE: RelOptRule = new StreamPhysicalLegacySinkRule
+ val INSTANCE: RelOptRule = new StreamPhysicalLegacySinkRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalLegacySink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalLegacySinkRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala
index 80705361b2a..3df07bac263 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.sources.StreamTableSource
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.TableScan
/**
* Rule that converts [[FlinkLogicalLegacyTableSourceScan]] to
* [[StreamPhysicalLegacyTableSourceScan]].
*/
-class StreamPhysicalLegacyTableSourceScanRule
- extends ConverterRule(
- classOf[FlinkLogicalLegacyTableSourceScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalLegacyTableSourceScanRule") {
+class StreamPhysicalLegacyTableSourceScanRule(config: Config) extends ConverterRule(config) {
/** Rule must only match if TableScan targets a [[StreamTableSource]] */
override def matches(call: RelOptRuleCall): Boolean = {
@@ -67,5 +63,10 @@ class StreamPhysicalLegacyTableSourceScanRule
}
object StreamPhysicalLegacyTableSourceScanRule {
- val INSTANCE: RelOptRule = new StreamPhysicalLegacyTableSourceScanRule
+ val INSTANCE: RelOptRule = new StreamPhysicalLegacyTableSourceScanRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalLegacyTableSourceScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalLegacyTableSourceScanRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLimitRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLimitRule.scala
index 6f69285c234..5ffd3ea872f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLimitRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLimitRule.scala
@@ -25,17 +25,13 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalL
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/**
* Rule that matches [[FlinkLogicalSort]] with empty sort fields, and converts it to
* [[StreamPhysicalLimit]].
*/
-class StreamPhysicalLimitRule
- extends ConverterRule(
- classOf[FlinkLogicalSort],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalLimitRule") {
+class StreamPhysicalLimitRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
@@ -65,5 +61,10 @@ class StreamPhysicalLimitRule
}
object StreamPhysicalLimitRule {
- val INSTANCE: RelOptRule = new StreamPhysicalLimitRule
+ val INSTANCE: RelOptRule = new StreamPhysicalLimitRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalSort],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalLimitRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalOverAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalOverAggregateRule.scala
index 9070b111fc2..60fdaceb93c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalOverAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalOverAggregateRule.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConverters._
@@ -35,12 +36,7 @@ import scala.collection.JavaConverters._
* StreamExecOverAggregate only supports one [[org.apache.calcite.rel.core.Window.Group]], else
* throw exception now
*/
-class StreamPhysicalOverAggregateRule
- extends ConverterRule(
- classOf[FlinkLogicalOverAggregate],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalOverAggregateRule") {
+class StreamPhysicalOverAggregateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val logicWindow: FlinkLogicalOverAggregate =
@@ -82,5 +78,10 @@ class StreamPhysicalOverAggregateRule
}
object StreamPhysicalOverAggregateRule {
- val INSTANCE: RelOptRule = new StreamPhysicalOverAggregateRule
+ val INSTANCE: RelOptRule = new StreamPhysicalOverAggregateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalOverAggregate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalOverAggregateRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCalcRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCalcRule.scala
index 9f26fba85ac..0e51aaa9dff 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCalcRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCalcRule.scala
@@ -25,16 +25,12 @@ import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConverters._
/** Rule that converts [[FlinkLogicalCalc]] to [[StreamPhysicalPythonCalc]]. */
-class StreamPhysicalPythonCalcRule
- extends ConverterRule(
- classOf[FlinkLogicalCalc],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalPythonCalcRule") {
+class StreamPhysicalPythonCalcRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val calc: FlinkLogicalCalc = call.rel(0)
@@ -57,5 +53,10 @@ class StreamPhysicalPythonCalcRule
}
object StreamPhysicalPythonCalcRule {
- val INSTANCE: RelOptRule = new StreamPhysicalPythonCalcRule
+ val INSTANCE: RelOptRule = new StreamPhysicalPythonCalcRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalCalc],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalPythonCalcRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
index 52ad809acd7..08c3616ae1f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
@@ -20,24 +20,19 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank
+import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate, StreamPhysicalRank}
import org.apache.flink.table.planner.plan.utils.{RankProcessStrategy, RankUtil}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/**
* Rule that converts [[FlinkLogicalRank]] with fetch to [[StreamPhysicalRank]]. NOTES: the rank can
* not be converted to [[StreamPhysicalDeduplicate]].
*/
-class StreamPhysicalRankRule
- extends ConverterRule(
- classOf[FlinkLogicalRank],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalRankRule") {
+class StreamPhysicalRankRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
@@ -73,5 +68,10 @@ class StreamPhysicalRankRule
}
object StreamPhysicalRankRule {
- val INSTANCE: RelOptRule = new StreamPhysicalRankRule
+ val INSTANCE: RelOptRule = new StreamPhysicalRankRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalRank],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalRankRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
index 8f159eaaf8b..c7e4158d2d9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
@@ -30,16 +30,12 @@ import org.apache.flink.table.types.logical.RowType
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConversions._
import scala.collection.mutable
-class StreamPhysicalSinkRule
- extends ConverterRule(
- classOf[FlinkLogicalSink],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalSinkRule") {
+class StreamPhysicalSinkRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val sink = rel.asInstanceOf[FlinkLogicalSink]
@@ -109,5 +105,10 @@ class StreamPhysicalSinkRule
}
object StreamPhysicalSinkRule {
- val INSTANCE = new StreamPhysicalSinkRule
+ val INSTANCE = new StreamPhysicalSinkRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalSink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalSinkRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala
index f3330f55ace..9bf4c51eb23 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala
@@ -26,17 +26,13 @@ import org.apache.flink.table.planner.plan.utils.RankProcessStrategy
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/**
* Rule that matches [[FlinkLogicalSort]] with non-empty sort fields and non-null fetch or offset,
* and converts it to [[StreamPhysicalSortLimit]].
*/
-class StreamPhysicalSortLimitRule
- extends ConverterRule(
- classOf[FlinkLogicalSort],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalSortLimitRule") {
+class StreamPhysicalSortLimitRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
@@ -67,5 +63,10 @@ class StreamPhysicalSortLimitRule
}
object StreamPhysicalSortLimitRule {
- val INSTANCE: RelOptRule = new StreamPhysicalSortLimitRule
+ val INSTANCE: RelOptRule = new StreamPhysicalSortLimitRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalSort],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalSortLimitRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortRule.scala
index adf757cbfca..63958f7235c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortRule.scala
@@ -25,17 +25,13 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalS
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/**
* Rule that matches [[FlinkLogicalSort]] which `fetch` is null or `fetch` is 0, and converts it to
* [[StreamPhysicalSort]].
*/
-class StreamPhysicalSortRule
- extends ConverterRule(
- classOf[FlinkLogicalSort],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalSortRule") {
+class StreamPhysicalSortRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
@@ -59,5 +55,10 @@ class StreamPhysicalSortRule
}
object StreamPhysicalSortRule {
- val INSTANCE: RelOptRule = new StreamPhysicalSortRule
+ val INSTANCE: RelOptRule = new StreamPhysicalSortRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalSort],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalSortRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
index e7c390e07bd..49632bf60e0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.planner.utils.ShortcutUtils
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.TableScan
/**
@@ -38,12 +39,7 @@ import org.apache.calcite.rel.core.TableScan
* <p>Depends whether this is a scan source, this rule will also generate
* [[StreamPhysicalChangelogNormalize]] to materialize the upsert stream.
*/
-class StreamPhysicalTableSourceScanRule
- extends ConverterRule(
- classOf[FlinkLogicalTableSourceScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalTableSourceScanRule") {
+class StreamPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {
/** Rule must only match if TableScan targets a [[ScanTableSource]] */
override def matches(call: RelOptRuleCall): Boolean = {
@@ -99,5 +95,10 @@ class StreamPhysicalTableSourceScanRule
}
object StreamPhysicalTableSourceScanRule {
- val INSTANCE = new StreamPhysicalTableSourceScanRule
+ val INSTANCE = new StreamPhysicalTableSourceScanRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalTableSourceScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalTableSourceScanRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalSortRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalSortRule.scala
index 78ccf2dfcf0..6005e096383 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalSortRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalSortRule.scala
@@ -27,17 +27,13 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelFieldCollation.Direction
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/**
* Rule that matches [[FlinkLogicalSort]] which is sorted by time attribute in ascending order and
* its `fetch` and `offset` are null, and converts it to [[StreamPhysicalTemporalSort]].
*/
-class StreamPhysicalTemporalSortRule
- extends ConverterRule(
- classOf[FlinkLogicalSort],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalTemporalSortRule") {
+class StreamPhysicalTemporalSortRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
@@ -62,7 +58,12 @@ class StreamPhysicalTemporalSortRule
}
object StreamPhysicalTemporalSortRule {
- val INSTANCE: RelOptRule = new StreamPhysicalTemporalSortRule
+ val INSTANCE: RelOptRule = new StreamPhysicalTemporalSortRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalSort],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalTemporalSortRule"))
/**
* Whether the given sort could be converted to [[StreamPhysicalTemporalSort]].
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalUnionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalUnionRule.scala
index 3e133b6325e..00131e78332 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalUnionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalUnionRule.scala
@@ -24,16 +24,12 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalU
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import scala.collection.JavaConversions._
/** Rule that converts [[FlinkLogicalUnion]] to [[StreamPhysicalUnion]]. */
-class StreamPhysicalUnionRule
- extends ConverterRule(
- classOf[FlinkLogicalUnion],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalUnionRule") {
+class StreamPhysicalUnionRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
call.rel(0).asInstanceOf[FlinkLogicalUnion].all
@@ -49,5 +45,10 @@ class StreamPhysicalUnionRule
}
object StreamPhysicalUnionRule {
- val INSTANCE: RelOptRule = new StreamPhysicalUnionRule
+ val INSTANCE: RelOptRule = new StreamPhysicalUnionRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalUnion],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalUnionRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalValuesRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalValuesRule.scala
index 3d6fcd3dbc7..eef97aa0e06 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalValuesRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalValuesRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalV
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule that converts [[FlinkLogicalValues]] to [[StreamPhysicalValues]]. */
-class StreamPhysicalValuesRule
- extends ConverterRule(
- classOf[FlinkLogicalValues],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalValuesRule") {
+class StreamPhysicalValuesRule(config: Config) extends ConverterRule(config) {
def convert(rel: RelNode): RelNode = {
val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues]
@@ -42,5 +38,10 @@ class StreamPhysicalValuesRule
}
object StreamPhysicalValuesRule {
- val INSTANCE: RelOptRule = new StreamPhysicalValuesRule
+ val INSTANCE: RelOptRule = new StreamPhysicalValuesRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalValues],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalValuesRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
index 337e2ec1c1e..c6b62b0d09e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
@@ -24,14 +24,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalW
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule that converts [[FlinkLogicalWatermarkAssigner]] to [[StreamPhysicalWatermarkAssigner]]. */
-class StreamPhysicalWatermarkAssignerRule
- extends ConverterRule(
- classOf[FlinkLogicalWatermarkAssigner],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalWatermarkAssignerRule") {
+class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val watermarkAssigner = rel.asInstanceOf[FlinkLogicalWatermarkAssigner]
@@ -50,5 +46,10 @@ class StreamPhysicalWatermarkAssignerRule
}
object StreamPhysicalWatermarkAssignerRule {
- val INSTANCE = new StreamPhysicalWatermarkAssignerRule
+ val INSTANCE = new StreamPhysicalWatermarkAssignerRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalWatermarkAssigner],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalWatermarkAssignerRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
index e33bac81876..4778c4c9c0a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.runtime.groupwindow._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Aggregate.Group
import org.apache.calcite.rex.{RexInputRef, RexProgram}
@@ -39,12 +40,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
/** Rule to convert a [[FlinkLogicalAggregate]] into a [[StreamPhysicalWindowAggregate]]. */
-class StreamPhysicalWindowAggregateRule
- extends ConverterRule(
- classOf[FlinkLogicalAggregate],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalWindowAggregateRule") {
+class StreamPhysicalWindowAggregateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: FlinkLogicalAggregate = call.rel(0)
@@ -247,7 +243,12 @@ class StreamPhysicalWindowAggregateRule
}
object StreamPhysicalWindowAggregateRule {
- val INSTANCE = new StreamPhysicalWindowAggregateRule
+ val INSTANCE = new StreamPhysicalWindowAggregateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalAggregate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalWindowAggregateRule"))
private val WINDOW_START: String = "window_start"
private val WINDOW_END: String = "window_end"
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowDeduplicateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowDeduplicateRule.scala
index e145ceb3fb3..c45229062d9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowDeduplicateRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowDeduplicateRule.scala
@@ -28,14 +28,10 @@ import org.apache.flink.table.planner.plan.utils.{RankUtil, WindowUtil}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule to convert a [[FlinkLogicalRank]] into a [[StreamPhysicalWindowDeduplicate]]. */
-class StreamPhysicalWindowDeduplicateRule
- extends ConverterRule(
- classOf[FlinkLogicalRank],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalWindowDeduplicateRule") {
+class StreamPhysicalWindowDeduplicateRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
@@ -89,5 +85,10 @@ class StreamPhysicalWindowDeduplicateRule
}
object StreamPhysicalWindowDeduplicateRule {
- val INSTANCE = new StreamPhysicalWindowDeduplicateRule
+ val INSTANCE = new StreamPhysicalWindowDeduplicateRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalRank],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalWindowDeduplicateRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowRankRule.scala
index 7193295df3a..7259e3495e0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowRankRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowRankRule.scala
@@ -28,14 +28,10 @@ import org.apache.flink.table.planner.plan.utils.{RankUtil, WindowUtil}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
/** Rule to convert a [[FlinkLogicalRank]] into a [[StreamPhysicalWindowRank]]. */
-class StreamPhysicalWindowRankRule
- extends ConverterRule(
- classOf[FlinkLogicalRank],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalWindowRankRule") {
+class StreamPhysicalWindowRankRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
@@ -89,6 +85,11 @@ class StreamPhysicalWindowRankRule
}
object StreamPhysicalWindowRankRule {
- val INSTANCE = new StreamPhysicalWindowRankRule
+ val INSTANCE = new StreamPhysicalWindowRankRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalRank],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalWindowRankRule"))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowTableFunctionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowTableFunctionRule.scala
index 23dc245e050..6ceeef69ac1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowTableFunctionRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowTableFunctionRule.scala
@@ -26,18 +26,14 @@ import org.apache.flink.table.planner.plan.utils.WindowUtil.{convertToWindowingS
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rex.RexCall
/**
* Rule to convert a [[FlinkLogicalTableFunctionScan]] with window table function call into a
* [[StreamPhysicalWindowTableFunction]].
*/
-class StreamPhysicalWindowTableFunctionRule
- extends ConverterRule(
- classOf[FlinkLogicalTableFunctionScan],
- FlinkConventions.LOGICAL,
- FlinkConventions.STREAM_PHYSICAL,
- "StreamPhysicalWindowTableFunctionRule") {
+class StreamPhysicalWindowTableFunctionRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: FlinkLogicalTableFunctionScan = call.rel(0)
@@ -64,5 +60,10 @@ class StreamPhysicalWindowTableFunctionRule
}
object StreamPhysicalWindowTableFunctionRule {
- val INSTANCE = new StreamPhysicalWindowTableFunctionRule
+ val INSTANCE = new StreamPhysicalWindowTableFunctionRule(
+ Config.INSTANCE.withConversion(
+ classOf[FlinkLogicalTableFunctionScan],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamPhysicalWindowTableFunctionRule"))
}