You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/09 06:40:16 UTC
[flink] 02/02: [FLINK-13587][table-planner-blink] Fix some operator
names are not set in blink planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4517949700b0401923eaa0bd6ec6704f05b7bdf3
Author: Jark Wu <im...@gmail.com>
AuthorDate: Mon Aug 5 22:39:18 2019 +0800
[FLINK-13587][table-planner-blink] Fix some operator names are not set in blink planner
This closes #9363
---
...iterImpl.java => RelDescriptionWriterImpl.java} | 23 ++--
.../planner/codegen/CorrelateCodeGenerator.scala | 7 +-
.../table/planner/delegation/BatchPlanner.scala | 2 +-
.../table/planner/delegation/StreamPlanner.scala | 2 +-
.../table/planner/plan/nodes/FlinkRelNode.scala | 27 ++---
.../table/planner/plan/nodes/calcite/Expand.scala | 22 ++--
.../planner/plan/nodes/common/CommonCalc.scala | 7 +-
.../plan/nodes/common/CommonLookupJoin.scala | 6 +-
.../plan/nodes/common/CommonPhysicalJoin.scala | 7 +-
.../plan/nodes/physical/batch/BatchExecCalc.scala | 3 +-
.../nodes/physical/batch/BatchExecCorrelate.scala | 4 +-
.../nodes/physical/batch/BatchExecExpand.scala | 3 +-
.../batch/BatchExecGroupAggregateBase.scala | 14 +--
.../physical/batch/BatchExecHashAggregate.scala | 5 -
.../batch/BatchExecHashAggregateBase.scala | 4 +-
.../nodes/physical/batch/BatchExecHashJoin.scala | 12 +-
.../batch/BatchExecHashWindowAggregate.scala | 4 -
.../batch/BatchExecHashWindowAggregateBase.scala | 4 +-
.../plan/nodes/physical/batch/BatchExecLimit.scala | 7 +-
.../batch/BatchExecLocalHashAggregate.scala | 2 -
.../batch/BatchExecLocalHashWindowAggregate.scala | 4 -
.../batch/BatchExecLocalSortAggregate.scala | 3 -
.../batch/BatchExecLocalSortWindowAggregate.scala | 2 -
.../physical/batch/BatchExecNestedLoopJoin.scala | 13 +-
.../physical/batch/BatchExecOverAggregate.scala | 6 +-
.../plan/nodes/physical/batch/BatchExecRank.scala | 10 +-
.../plan/nodes/physical/batch/BatchExecSort.scala | 2 +-
.../physical/batch/BatchExecSortAggregate.scala | 6 -
.../batch/BatchExecSortAggregateBase.scala | 4 +-
.../nodes/physical/batch/BatchExecSortLimit.scala | 9 +-
.../physical/batch/BatchExecSortMergeJoin.scala | 10 +-
.../batch/BatchExecSortWindowAggregate.scala | 5 -
.../batch/BatchExecSortWindowAggregateBase.scala | 4 +-
.../nodes/physical/batch/BatchExecValues.scala | 1 +
.../nodes/physical/stream/StreamExecCalc.scala | 2 +-
.../physical/stream/StreamExecCorrelate.scala | 1 +
.../physical/stream/StreamExecDeduplicate.scala | 13 +-
.../nodes/physical/stream/StreamExecExpand.scala | 3 +-
.../stream/StreamExecGlobalGroupAggregate.scala | 2 +-
.../physical/stream/StreamExecGroupAggregate.scala | 2 +-
.../stream/StreamExecGroupWindowAggregate.scala | 10 +-
.../StreamExecIncrementalGroupAggregate.scala | 2 +-
.../nodes/physical/stream/StreamExecJoin.scala | 9 +-
.../nodes/physical/stream/StreamExecLimit.scala | 2 +-
.../stream/StreamExecLocalGroupAggregate.scala | 2 +-
.../nodes/physical/stream/StreamExecMatch.scala | 2 +-
.../physical/stream/StreamExecOverAggregate.scala | 2 +-
.../nodes/physical/stream/StreamExecRank.scala | 16 +--
.../nodes/physical/stream/StreamExecSort.scala | 2 +-
.../physical/stream/StreamExecSortLimit.scala | 9 +-
.../physical/stream/StreamExecTemporalJoin.scala | 8 +-
.../physical/stream/StreamExecTemporalSort.scala | 4 +-
.../nodes/physical/stream/StreamExecValues.scala | 1 +
.../stream/StreamExecWatermarkAssigner.scala | 18 +--
.../physical/stream/StreamExecWindowJoin.scala | 11 +-
.../planner/plan/utils/ExecNodePlanDumper.scala | 2 +-
.../table/planner/plan/utils/FlinkRelOptUtil.scala | 2 +-
.../table/planner/plan/utils/RelExplainUtil.scala | 81 +++----------
.../flink/table/planner/plan/utils/ScanUtil.scala | 5 +-
.../resources/explain/testGetStatsFromCatalog.out | 6 +-
.../apache/flink/table/api/batch/ExplainTest.xml | 104 ++++++++--------
.../apache/flink/table/api/stream/ExplainTest.xml | 132 ++++++++++-----------
.../table/planner/plan/batch/sql/LimitTest.xml | 8 +-
.../planner/plan/batch/sql/SetOperatorsTest.xml | 6 +-
.../table/planner/plan/batch/sql/SortLimitTest.xml | 8 +-
.../table/planner/plan/batch/sql/UnnestTest.xml | 20 ++--
.../table/planner/plan/batch/sql/ValuesTest.xml | 22 ++--
.../plan/batch/sql/join/BroadcastHashJoinTest.xml | 2 +-
.../plan/batch/sql/join/NestedLoopJoinTest.xml | 6 +-
.../plan/batch/sql/join/SortMergeJoinTest.xml | 2 +-
.../logical/CalcPruneAggregateCallRuleTest.xml | 10 +-
.../rules/logical/FlinkAggregateRemoveRuleTest.xml | 8 +-
.../rules/logical/FlinkLimit0RemoveRuleTest.xml | 18 +--
.../rules/logical/FlinkPruneEmptyRulesTest.xml | 2 +-
.../plan/rules/logical/LogicalUnnestRuleTest.xml | 20 ++--
.../logical/ProjectPruneAggregateCallRuleTest.xml | 4 +-
.../logical/subquery/SubQuerySemiJoinTest.xml | 4 +-
.../table/planner/plan/stream/sql/LimitTest.xml | 24 ++--
.../plan/stream/sql/MiniBatchIntervalInferTest.xml | 30 ++---
.../table/planner/plan/stream/sql/RankTest.xml | 4 +-
.../planner/plan/stream/sql/SetOperatorsTest.xml | 6 +-
.../planner/plan/stream/sql/SortLimitTest.xml | 32 ++---
.../table/planner/plan/stream/sql/UnnestTest.xml | 20 ++--
.../table/planner/plan/stream/sql/ValuesTest.xml | 22 ++--
.../planner/plan/stream/sql/join/JoinTest.xml | 2 +-
.../plan/stream/sql/join/WindowJoinTest.xml | 2 +-
.../flink/table/planner/utils/TableTestBase.scala | 26 ++--
87 files changed, 405 insertions(+), 600 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDisplayNameWriterImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDescriptionWriterImpl.java
similarity index 81%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDisplayNameWriterImpl.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDescriptionWriterImpl.java
index b7f6d25..ebdc0d2 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDisplayNameWriterImpl.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDescriptionWriterImpl.java
@@ -34,17 +34,19 @@ import java.util.List;
/**
* Converts an {@link RelNode} to string with only the information from the RelNode itself
* without the information from its inputs. This is mainly used to generate
- * {@link FlinkRelNode#getDisplayName()}.
+ * {@link FlinkRelNode#getRelDetailedDescription()}.
*/
-public class RelDisplayNameWriterImpl implements RelWriter {
+public class RelDescriptionWriterImpl implements RelWriter {
- private static final String STREAM_EXEC = "StreamExec";
- private static final String BATCH_EXEC = "BatchExec";
+ /**
+ * all the supported prefixes of RelNode class name (i.e. all the implementation of {@link FlinkRelNode}).
+ */
+ private static final String[] REL_TYPE_NAME_PREFIXES = {"StreamExec", "BatchExec", "FlinkLogical"};
private final PrintWriter pw;
private final List<Pair<String, Object>> values = new ArrayList<>();
- public RelDisplayNameWriterImpl(PrintWriter pw) {
+ public RelDescriptionWriterImpl(PrintWriter pw) {
this.pw = pw;
}
@@ -95,12 +97,11 @@ public class RelDisplayNameWriterImpl implements RelWriter {
private String getNodeTypeName(RelNode rel) {
String typeName = rel.getRelTypeName();
- if (typeName.startsWith(STREAM_EXEC)) {
- return typeName.substring(STREAM_EXEC.length());
- } else if (typeName.startsWith(BATCH_EXEC)) {
- return typeName.substring(BATCH_EXEC.length());
- } else {
- throw new IllegalStateException("Unsupported RelNode class name '" + typeName + "'");
+ for (String prefix : REL_TYPE_NAME_PREFIXES) {
+ if (typeName.startsWith(prefix)) {
+ return typeName.substring(prefix.length());
+ }
}
+ throw new IllegalStateException("Unsupported RelNode class name '" + typeName + "'");
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index c292c64..31bed9e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -135,12 +135,7 @@ object CorrelateCodeGenerator {
new OneInputTransformation(
inputTransformation,
- RelExplainUtil.correlateOpName(
- inputRelType,
- rexCall,
- sqlFunction,
- outDataType,
- expression),
+ ruleDescription,
substituteStreamOperator,
BaseRowTypeInfo.of(returnType),
parallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 213be6a..56a2fa2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -104,7 +104,7 @@ class BatchPlanner(
val explainLevel = if (extended) {
SqlExplainLevel.ALL_ATTRIBUTES
} else {
- SqlExplainLevel.EXPPLAN_ATTRIBUTES
+ SqlExplainLevel.DIGEST_ATTRIBUTES
}
sb.append(ExecNodePlanDumper.dagToString(execNodes, explainLevel))
sb.append(System.lineSeparator)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 091083a..e5add45 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -95,7 +95,7 @@ class StreamPlanner(
val (explainLevel, withRetractTraits) = if (extended) {
(SqlExplainLevel.ALL_ATTRIBUTES, true)
} else {
- (SqlExplainLevel.EXPPLAN_ATTRIBUTES, false)
+ (SqlExplainLevel.DIGEST_ATTRIBUTES, false)
}
sb.append(ExecNodePlanDumper.dagToString(
execNodes,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/FlinkRelNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/FlinkRelNode.scala
index 62a4d45..75fddec 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/FlinkRelNode.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/FlinkRelNode.scala
@@ -18,12 +18,13 @@
package org.apache.flink.table.planner.plan.nodes
-import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
+import org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl
+
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlAsOperator
import org.apache.calcite.sql.SqlKind._
-import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
-import org.apache.flink.table.planner.plan.utils.RelDisplayNameWriterImpl
import java.io.{PrintWriter, StringWriter}
@@ -35,13 +36,17 @@ import scala.collection.JavaConversions._
trait FlinkRelNode extends RelNode {
/**
- * Returns the display name of the RelNode. The display name are usually used to be displayed
- * in log or Web UI.
+ * Returns a string which describes the detailed information of relational expression
+ * with attributes which contribute to the plan output.
+ *
+ * This method leverages [[RelNode#explain]] with
+ * [[org.apache.calcite.sql.SqlExplainLevel.EXPPLAN_ATTRIBUTES]] explain level to generate
+ * the description.
*/
- def getDisplayName: String = {
+ def getRelDetailedDescription: String = {
val sw = new StringWriter
val pw = new PrintWriter(sw)
- val relWriter = new RelDisplayNameWriterImpl(pw)
+ val relWriter = new RelDescriptionWriterImpl(pw)
this.explain(relWriter)
sw.toString
}
@@ -114,14 +119,6 @@ trait FlinkRelNode extends RelNode {
throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
}
}
-
- def getExpressionFormat(pw: RelWriter): ExpressionFormat = pw match {
- // infix format is more readable for displaying
- case _: RelDisplayNameWriterImpl => ExpressionFormat.Infix
- // traditional writer prefers prefix expression format, e.g. +(x, y)
- case _ => ExpressionFormat.Prefix
- }
-
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
index 093f87e..b5ea110 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlExplainLevel
import org.apache.calcite.util.Litmus
import java.util
@@ -82,14 +83,19 @@ abstract class Expand(
override def explainTerms(pw: RelWriter): RelWriter = {
val names = outputRowType.getFieldNames
- val terms = projects.map {
- project =>
- project.zipWithIndex.map {
- case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]"
- case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]"
- case (o, _) => s"$o"
- }.mkString("{", ", ", "}")
- }.mkString(", ")
+ val terms = if (pw.getDetailLevel == SqlExplainLevel.EXPPLAN_ATTRIBUTES) {
+ // improve the readability
+ names.mkString(", ")
+ } else {
+ projects.map {
+ project =>
+ project.zipWithIndex.map {
+ case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]"
+ case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]"
+ case (o, _) => s"$o"
+ }.mkString("{", ", ", "}")
+ }.mkString(", ")
+ }
super.explainTerms(pw).item("projects", terms)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
index d50b11f..532e9df 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
@@ -20,8 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.common
import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
import org.apache.flink.table.planner.plan.nodes.{ExpressionFormat, FlinkRelNode}
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil.conditionToString
-
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil.{conditionToString, preferExpressionFormat}
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -60,9 +59,9 @@ abstract class CommonCalc(
override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("input", getInput)
- .item("select", projectionToString())
+ .item("select", projectionToString(preferExpressionFormat(pw)))
.itemIf("where",
- conditionToString(calcProgram, getExpressionString),
+ conditionToString(calcProgram, getExpressionString, preferExpressionFormat(pw)),
calcProgram.getCondition != null)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
index 88800da..a6163da 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil._
import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, RelExplainUtil}
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
import org.apache.flink.table.planner.utils.TableConfigUtils.getMillisecondFromConfigDuration
import org.apache.flink.table.runtime.operators.join.lookup.{AsyncLookupJoinRunner, AsyncLookupJoinWithCalcRunner, LookupJoinRunner, LookupJoinWithCalcRunner}
import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter
@@ -126,7 +127,8 @@ abstract class CommonLookupJoin(
val resultFieldNames = getRowType.getFieldNames.asScala.toArray
val lookupableSource = tableSource.asInstanceOf[LookupableTableSource[_]]
val whereString = calcOnTemporalTable match {
- case Some(calc) => RelExplainUtil.conditionToString(calc, getExpressionString)
+ case Some(calc) => RelExplainUtil.conditionToString(
+ calc, getExpressionString, preferExpressionFormat(pw))
case None => "N/A"
}
@@ -343,7 +345,7 @@ abstract class CommonLookupJoin(
new OneInputTransformation(
inputTransformation,
- "LookupJoin",
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(resultRowType),
inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalJoin.scala
index a7844cb..7cff990 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalJoin.scala
@@ -19,7 +19,8 @@
package org.apache.flink.table.planner.plan.nodes.common
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, JoinUtil, RelExplainUtil}
+import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, JoinUtil}
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
import org.apache.flink.table.runtime.operators.join.FlinkJoinType
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -78,8 +79,8 @@ abstract class CommonPhysicalJoin(
override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("left", getLeft).input("right", getRight)
.item("joinType", flinkJoinType.toString)
- .item("where",
- RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString))
+ .item("where", getExpressionString(
+ getCondition, inputRowType.getFieldNames.toList, None, preferExpressionFormat(pw)))
.item("select", getRowType.getFieldNames.mkString(", "))
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
index 3d03932..aefcd2f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
@@ -28,7 +28,6 @@ import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
import org.apache.calcite.plan._
@@ -158,7 +157,7 @@ class BatchExecCalc(
new OneInputTransformation(
inputTransform,
- RelExplainUtil.calcToString(calcProgram, getExpressionString),
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
index b11a7a3..029c171 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -188,7 +188,7 @@ class BatchExecCorrelate(
val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[BaseRow]]
val operatorCtx = CodeGeneratorContext(config)
- CorrelateCodeGenerator.generateCorrelateTransformation(
+ val transformation = CorrelateCodeGenerator.generateCorrelateTransformation(
config,
operatorCtx,
inputTransformation,
@@ -202,6 +202,8 @@ class BatchExecCorrelate(
retainHeader = false,
getExpressionString,
"BatchExecCorrelate")
+ transformation.setName(getRelDetailedDescription)
+ transformation
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
index e6a9c8e..7c8ec3c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
@@ -99,10 +99,9 @@ class BatchExecExpand(
projects,
opName = "BatchExpand")
- val operatorName = s"BatchExecExpand: ${getRowType.getFieldList.map(_.getName).mkString(", ")}"
new OneInputTransformation(
inputTransform,
- operatorName,
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala
index 2592ac1..502a932 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.table.api.TableException
import org.apache.flink.table.functions.UserDefinedFunction
-import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil}
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
import org.apache.flink.table.planner.utils.TableConfigUtils.getAggPhaseStrategy
@@ -74,18 +74,6 @@ abstract class BatchExecGroupAggregateBase(
def getAggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)] = aggCallToAggFunction
- def aggOperatorName(prefix: String): String = {
- RelExplainUtil.aggOperatorName(
- prefix,
- grouping,
- auxGrouping,
- inputRowType,
- outputRowType,
- aggCallToAggFunction,
- isMerge,
- isFinal)
- }
-
protected def isEnforceTwoStageAgg: Boolean = {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
getAggPhaseStrategy(tableConfig) == AggregatePhaseStrategy.TWO_PHASE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregate.scala
index 481f387..eac9b02 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregate.scala
@@ -147,9 +147,4 @@ class BatchExecHashAggregate(
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior = DamBehavior.FULL_DAM
-
- override def getOperatorName: String = {
- val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
- aggOperatorName(aggregateNamePrefix + "HashAggregate")
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
index 46e8efd..042df22 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
@@ -117,8 +117,6 @@ abstract class BatchExecHashAggregateBase(
replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
}
- def getOperatorName: String
-
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
@@ -147,7 +145,7 @@ abstract class BatchExecHashAggregateBase(
val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
val ret = new OneInputTransformation(
input,
- getOperatorName,
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
index e029b0f..1d01f11 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
@@ -265,7 +265,7 @@ class BatchExecHashJoin(
val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
build,
probe,
- getOperatorName,
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
probe.getParallelism)
@@ -273,14 +273,4 @@ class BatchExecHashJoin(
ret.setResources(resource, resource)
ret
}
-
- private def getOperatorName: String = {
- val joinExpressionStr = if (getCondition != null) {
- val inFields = inputRowType.getFieldNames.toList
- s"where: ${getExpressionString(getCondition, inFields, None, ExpressionFormat.Infix)}, "
- } else {
- ""
- }
- s"HashJoin($joinExpressionStr${if (leftIsBuild) "buildLeft" else "buildRight"})"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
index 8e19c75..840a2e5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
@@ -91,8 +91,4 @@ class BatchExecHashWindowAggregate(
override def getDamBehavior: DamBehavior = DamBehavior.FULL_DAM
- override def getOperatorName: String = {
- val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
- aggregateNamePrefix + "WindowHashAggregateBatchExec"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index 6074157..de0b97b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -116,8 +116,6 @@ abstract class BatchExecHashWindowAggregateBase(
replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
}
- def getOperatorName: String
-
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
@@ -149,7 +147,7 @@ abstract class BatchExecHashWindowAggregateBase(
val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
val ret = new OneInputTransformation(
input,
- getOperatorName,
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
index 6b4ee94..ed64648 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
@@ -108,14 +108,9 @@ class BatchExecLimit(
val operator = new LimitOperator(isGlobal, limitStart, limitEnd)
new OneInputTransformation(
input,
- getOperatorName,
+ getRelDetailedDescription,
operator,
inputType,
input.getParallelism)
}
-
- private def getOperatorName = {
- val prefix = if (isGlobal) "Global" else "Local"
- s"${prefix}Limit(offset: $limitStart, fetch: ${fetchToString(fetch)})"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
index b48ba36..54d0544 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
@@ -127,6 +127,4 @@ class BatchExecLocalHashAggregate(
override def getDamBehavior: DamBehavior = {
if (grouping.length == 0) DamBehavior.FULL_DAM else DamBehavior.MATERIALIZING
}
-
- override def getOperatorName: String = aggOperatorName("LocalHashAggregate")
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
index aac9dc6..22162bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
@@ -86,8 +86,4 @@ class BatchExecLocalHashWindowAggregate(
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = DamBehavior.MATERIALIZING
-
- override def getOperatorName: String = {
- "LocalWindowHashAggregateBatchExec"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
index b615daa..c4078cb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
@@ -135,7 +135,4 @@ class BatchExecLocalSortAggregate(
override def getDamBehavior: DamBehavior = {
if (grouping.length == 0) DamBehavior.FULL_DAM else DamBehavior.MATERIALIZING
}
-
- override def getOperatorName: String = aggOperatorName("LocalSortAggregate")
-
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
index 02e2345..1f2482f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
@@ -86,6 +86,4 @@ class BatchExecLocalSortWindowAggregate(
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = DamBehavior.MATERIALIZING
-
- override def getOperatorName: String = "LocalSortWindowAggregateBatchExec"
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
index db30712..8345e1d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
@@ -165,22 +165,11 @@ class BatchExecNestedLoopJoin(
val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
lInput,
rInput,
- getOperatorName,
+ getRelDetailedDescription,
op,
BaseRowTypeInfo.of(outputType),
parallelism)
ret.setResources(resourceSpec, resourceSpec)
ret
}
-
- private def getOperatorName: String = {
- val joinExpressionStr = if (getCondition != null) {
- val inFields = inputRowType.getFieldNames.toList
- s"where: ${getExpressionString(getCondition, inFields, None, ExpressionFormat.Infix)}, "
- } else {
- ""
- }
- s"NestedLoopJoin($joinExpressionStr${if (leftIsBuild) "buildLeft" else "buildRight"})"
- }
-
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index abe140a..2baced1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -421,7 +421,11 @@ class BatchExecOverAggregate(
}
val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
val ret = new OneInputTransformation(
- input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), input.getParallelism)
+ input,
+ getRelDetailedDescription,
+ operator,
+ BaseRowTypeInfo.of(outputType),
+ input.getParallelism)
ret.setResources(resource, resource)
ret
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
index 52d237f..15c2bea 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
@@ -288,17 +288,9 @@ class BatchExecRank(
new OneInputTransformation(
input,
- getOperatorName,
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
input.getParallelism)
}
-
- private def getOperatorName: String = {
- if (isGlobal) {
- "GlobalRank"
- } else {
- "LocalRank"
- }
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
index 66df094..6c4ec67 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
@@ -128,7 +128,7 @@ class BatchExecSort(
val ret = new OneInputTransformation(
input,
- s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
+ getRelDetailedDescription,
operator.asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]],
BaseRowTypeInfo.of(outputType),
input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregate.scala
index 9f9e665..02167a5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregate.scala
@@ -161,10 +161,4 @@ class BatchExecSortAggregate(
override def getDamBehavior: DamBehavior = {
if (grouping.length == 0) DamBehavior.FULL_DAM else DamBehavior.PIPELINED
}
-
- override def getOperatorName: String = {
- val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
- aggOperatorName(aggregateNamePrefix + "SortAggregate")
- }
-
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
index 80f56f2..9fd3a3c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
@@ -99,8 +99,6 @@ abstract class BatchExecSortAggregateBase(
replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
}
- def getOperatorName: String
-
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[BaseRow] = {
val input = getInputNodes.get(0).translateToPlan(planner)
@@ -122,7 +120,7 @@ abstract class BatchExecSortAggregateBase(
val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
new OneInputTransformation(
input,
- getOperatorName,
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
index 5426fb8..37635b2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
@@ -143,16 +143,9 @@ class BatchExecSortLimit(
new OneInputTransformation(
input,
- getOperatorName,
+ getRelDetailedDescription,
operator,
inputType,
input.getParallelism)
}
-
- private def getOperatorName = {
- s"${if (isGlobal) "Global" else "Local"}SortLimit(" +
- s"orderBy: [${RelExplainUtil.collationToString(sortCollation, getRowType)}], " +
- s"offset: $limitStart, " +
- s"fetch: ${RelExplainUtil.fetchToString(fetch)})"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
index c15b6f7..c8c9b2d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
@@ -265,7 +265,7 @@ class BatchExecSortMergeJoin(
val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
leftInput,
rightInput,
- getOperatorName,
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
rightInput.getParallelism)
@@ -278,12 +278,4 @@ class BatchExecSortMergeJoin(
val mq = relNode.getCluster.getMetadataQuery
mq.getAverageRowSize(relNode) * mq.getRowCount(relNode)
}
-
- private def getOperatorName: String = if (getCondition != null) {
- val inFields = inputRowType.getFieldNames.toList
- s"SortMergeJoin(where: ${
- getExpressionString(getCondition, inFields, None, ExpressionFormat.Infix)})"
- } else {
- "SortMergeJoin"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
index fba66df..a472999 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
@@ -88,9 +88,4 @@ class BatchExecSortWindowAggregate(
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
-
- override def getOperatorName: String = {
- val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
- aggregateNamePrefix + "WindowSortAggregateBatchExec"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index 8c621d5..1e1111e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -105,8 +105,6 @@ abstract class BatchExecSortWindowAggregateBase(
replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
}
- def getOperatorName: String
-
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[BaseRow] = {
val input = getInputNodes.get(0).translateToPlan(planner)
@@ -137,7 +135,7 @@ abstract class BatchExecSortWindowAggregateBase(
val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
new OneInputTransformation(
input,
- getOperatorName,
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
index e1e13ea..8abba12 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
@@ -80,6 +80,7 @@ class BatchExecValues(
getRelTypeName)
val transformation = planner.getExecEnv.createInput(inputFormat,
inputFormat.getProducedType).getTransformation
+ transformation.setName(getRelDetailedDescription)
transformation.setParallelism(1)
transformation
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
index a249436..dc98107 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
@@ -119,7 +119,7 @@ class StreamExecCalc(
)
val ret = new OneInputTransformation(
inputTransform,
- RelExplainUtil.calcToString(calcProgram, getExpressionString),
+ getRelDetailedDescription,
substituteStreamOperator,
BaseRowTypeInfo.of(outputType),
inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
index ae5788a..0675669 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
@@ -135,6 +135,7 @@ class StreamExecCorrelate(
retainHeader = true,
getExpressionString,
"StreamExecCorrelate")
+ transform.setName(getRelDetailedDescription)
if (inputsContainSingleton()) {
transform.setParallelism(1)
transform.setMaxParallelism(1)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
index 888ee6e..d43c223 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
@@ -82,8 +82,9 @@ class StreamExecDeduplicate(
override def explainTerms(pw: RelWriter): RelWriter = {
val fieldNames = getRowType.getFieldNames
+ val keep = if (keepLastRow) "LastRow" else "FirstRow"
super.explainTerms(pw)
- .item("keepLastRow", keepLastRow)
+ .item("keep", keep)
.item("key", uniqueKeys.map(fieldNames.get).mkString(", "))
.item("order", "PROCTIME")
}
@@ -141,7 +142,7 @@ class StreamExecDeduplicate(
}
val ret = new OneInputTransformation(
inputTransform,
- getOperatorName,
+ getRelDetailedDescription,
operator,
rowTypeInfo,
inputTransform.getParallelism)
@@ -156,12 +157,4 @@ class StreamExecDeduplicate(
ret.setStateKeyType(selector.getProducedType)
ret
}
-
- private def getOperatorName: String = {
- val fieldNames = getRowType.getFieldNames
- val keyNames = uniqueKeys.map(fieldNames.get).mkString(", ")
- s"${if (keepLastRow) "keepLastRow" else "KeepFirstRow"}" +
- s": (key: ($keyNames), select: (${fieldNames.mkString(", ")}), order: (PROCTIME)"
- }
-
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
index ec113a4..3abb0b9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
@@ -94,10 +94,9 @@ class StreamExecExpand(
opName = "StreamExpand",
retainHeader = true)
- val operatorName = s"StreamExecExpand: ${getRowType.getFieldList.map(_.getName).mkString(", ")}"
val transform = new OneInputTransformation(
inputTransform,
- operatorName,
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
index ba27a5a..8c1e1cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
@@ -186,7 +186,7 @@ class StreamExecGlobalGroupAggregate(
// partitioned aggregation
val ret = new OneInputTransformation(
inputTransformation,
- "GlobalGroupAggregate",
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outRowType),
inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
index 676d05a..b922c86 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
@@ -203,7 +203,7 @@ class StreamExecGroupAggregate(
// partitioned aggregation
val ret = new OneInputTransformation(
inputTransformation,
- "GroupAggregate",
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outRowType),
inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
index 212b01a..df6f95b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
@@ -218,17 +218,9 @@ class StreamExecGroupWindowAggregate(
inputRowTypeInfo.getLogicalTypes,
timeIdx)
- val operatorName = if (grouping.nonEmpty) {
- s"window: ($window), " +
- s"groupBy: (${RelExplainUtil.fieldToString(grouping, inputRowType)}), " +
- s"select: ($aggString)"
- } else {
- s"window: ($window), select: ($aggString)"
- }
-
val transformation = new OneInputTransformation(
inputTransform,
- operatorName,
+ getRelDetailedDescription,
operator,
outRowType,
inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
index ae52f71..5c7c9eb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
@@ -183,7 +183,7 @@ class StreamExecIncrementalGroupAggregate(
// partitioned aggregation
val ret = new OneInputTransformation(
inputTransformation,
- "IncrementalGroupAggregate",
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outRowType),
inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
index 77a8e09..eb989be 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
@@ -194,7 +194,7 @@ class StreamExecJoin(
val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
leftTransform,
rightTransform,
- getJoinOperatorName(),
+ getRelDetailedDescription,
operator,
returnType,
leftTransform.getParallelism)
@@ -249,11 +249,4 @@ class StreamExecJoin(
}
smallest
}
-
- private def getJoinOperatorName(): String = {
- val where = RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString)
- val select = getRowType.getFieldNames.mkString(", ")
- s"${flinkJoinType.toString}(where: ($where), select: ($select), " +
- s"leftInputSpec: ${analyzeJoinInput(left)}, rightInputSpec: ${analyzeJoinInput(right)})"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
index 98b4a02..b25d607 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
@@ -167,7 +167,7 @@ class StreamExecLimit(
// as input node is singleton exchange, its parallelism is 1.
val ret = new OneInputTransformation(
inputTransform,
- s"Limit(offset: $limitStart, fetch: ${fetchToString(fetch)})",
+ getRelDetailedDescription,
operator,
outputRowTypeInfo,
inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
index c898881..f128c60 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
@@ -146,7 +146,7 @@ class StreamExecLocalGroupAggregate(
val transformation = new OneInputTransformation(
inputTransformation,
- "LocalGroupAggregate",
+ getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outRowType),
inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
index 6c07531..8415da3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
@@ -231,7 +231,7 @@ class StreamExecMatch(
val outputRowTypeInfo = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
val transformation = new OneInputTransformation[BaseRow, BaseRow](
timestampedInput,
- toString,
+ getRelDetailedDescription,
operator,
outputRowTypeInfo,
timestampedInput.getParallelism
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
index c957110..eae561a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
@@ -282,7 +282,7 @@ class StreamExecOverAggregate(
val ret = new OneInputTransformation(
inputDS,
- "OverAggregate",
+ getRelDetailedDescription,
operator,
returnTypeInfo,
inputDS.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
index d227dd6..08717d0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
@@ -204,7 +204,6 @@ class StreamExecRank(
generateRetraction,
outputRankNumber)
}
- val rankOpName = getOperatorName
val operator = new KeyedProcessOperator(processFunction)
processFunction.setKeyContext(operator)
val inputTransform = getInputNodes.get(0).translateToPlan(planner)
@@ -213,7 +212,7 @@ class StreamExecRank(
FlinkTypeFactory.toLogicalRowType(getRowType))
val ret = new OneInputTransformation(
inputTransform,
- rankOpName,
+ getRelDetailedDescription,
operator,
outputRowTypeInfo,
inputTransform.getParallelism)
@@ -229,19 +228,6 @@ class StreamExecRank(
ret.setStateKeyType(selector.getProducedType)
ret
}
-
- private def getOperatorName: String = {
- val inputRowType = inputRel.getRowType
- var result = getStrategy().toString
- result += s"(orderBy: (${RelExplainUtil.collationToString(orderKey, inputRowType)})"
- if (partitionKey.nonEmpty) {
- val partitionKeys = partitionKey.toArray
- result += s", partitionBy: (${RelExplainUtil.fieldToString(partitionKeys, inputRowType)})"
- }
- result += s", ${getRowType.getFieldNames.mkString(", ")}"
- result += s", ${rankRange.toString(inputRowType.getFieldNames)})"
- result
- }
}
object StreamExecRank {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
index 3a7c927..f213253 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
@@ -132,7 +132,7 @@ class StreamExecSort(
// as input node is singleton exchange, its parallelism is 1.
val ret = new OneInputTransformation(
input,
- s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
+ getRelDetailedDescription,
sortOperator,
outputRowTypeInfo,
input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
index b34975b..0a48d55 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
@@ -214,7 +214,7 @@ class StreamExecSortLimit(
val ret = new OneInputTransformation(
inputTransform,
- getOperatorName,
+ getRelDetailedDescription,
operator,
outputRowTypeInfo,
inputTransform.getParallelism)
@@ -229,11 +229,4 @@ class StreamExecSortLimit(
ret.setStateKeyType(selector.getProducedType)
ret
}
-
- private def getOperatorName = {
- s"SortLimit(" +
- s"orderBy: [${RelExplainUtil.collationToString(sortCollation, getRowType)}], " +
- s"offset: $limitStart, " +
- s"fetch: ${RelExplainUtil.fetchToString(fetch)})"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
index 9002452..4e5e327 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
@@ -140,7 +140,7 @@ class StreamExecTemporalJoin(
val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
leftTransform,
rightTransform,
- getJoinOperatorName,
+ getRelDetailedDescription,
joinOperator,
BaseRowTypeInfo.of(returnType),
leftTransform.getParallelism)
@@ -176,12 +176,6 @@ class StreamExecTemporalJoin(
}
})
}
-
- private def getJoinOperatorName: String = {
- val where = RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString)
- val select = getRowType.getFieldNames.mkString(", ")
- s"TemporalTableJoin(where: ($where), select: ($select)"
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
index 030c483..aac4a7d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
@@ -147,7 +147,7 @@ class StreamExecTemporalSort(
// as input node is singleton exchange, its parallelism is 1.
val ret = new OneInputTransformation(
input,
- "ProcTimeSortOperator",
+ getRelDetailedDescription,
sortOperator,
outputRowTypeInfo,
input.getParallelism)
@@ -187,7 +187,7 @@ class StreamExecTemporalSort(
val ret = new OneInputTransformation(
input,
- "RowTimeSortOperator",
+ getRelDetailedDescription,
sortOperator,
outputRowTypeInfo,
input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
index a95558b..6ec1d1f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
@@ -82,6 +82,7 @@ class StreamExecValues(
getRelTypeName)
val transformation = planner.getExecEnv.createInput(inputFormat,
inputFormat.getProducedType).getTransformation
+ transformation.setName(getRelDetailedDescription)
transformation.setParallelism(1)
transformation.setMaxParallelism(1)
transformation
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
index 0db10e4..871d54e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
@@ -119,39 +119,31 @@ class StreamExecWatermarkAssigner(
val idleTimeout = getMillisecondFromConfigDuration(config,
ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT)
- val (operator, opName) = if (inferredInterval.mode == MiniBatchMode.None ||
+ val operator = if (inferredInterval.mode == MiniBatchMode.None ||
inferredInterval.interval == 0) {
require(rowtimeFieldIndex.isDefined, "rowtimeFieldIndex should not be None")
require(watermarkDelay.isDefined, "watermarkDelay should not be None")
// 1. redundant watermark definition in DDL
// 2. existing window aggregate
// 3. operator requiring watermark, but minibatch is not enabled
- val op = new WatermarkAssignerOperator(rowtimeFieldIndex.get, watermarkDelay.get, idleTimeout)
- val opName =
- s"WatermarkAssigner(rowtime: ${rowtimeFieldIndex.get}, offset: ${watermarkDelay.get})"
- (op, opName)
+ new WatermarkAssignerOperator(rowtimeFieldIndex.get, watermarkDelay.get, idleTimeout)
} else if (inferredInterval.mode == MiniBatchMode.ProcTime) {
- val op = new MiniBatchAssignerOperator(inferredInterval.interval)
- val opName = s"MiniBatchAssigner(intervalMs: ${inferredInterval.interval})"
- (op, opName)
+ new MiniBatchAssignerOperator(inferredInterval.interval)
} else {
require(rowtimeFieldIndex.isDefined, "rowtimeFieldIndex should not be None")
require(watermarkDelay.isDefined, "watermarkDelay should not be None")
- val op = new MiniBatchedWatermarkAssignerOperator(
+ new MiniBatchedWatermarkAssignerOperator(
rowtimeFieldIndex.get,
watermarkDelay.get,
0,
idleTimeout,
inferredInterval.interval)
- val opName = s"MiniBatchedWatermarkAssigner(rowtime: ${rowtimeFieldIndex.get}," +
- s" offset: ${watermarkDelay.get}, intervalMs: ${inferredInterval.interval})"
- (op, opName)
}
val outputRowTypeInfo = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
val transformation = new OneInputTransformation[BaseRow, BaseRow](
inputTransformation,
- opName,
+ getRelDetailedDescription,
operator,
outputRowTypeInfo,
inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
index d1127cc..2b09625 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
@@ -29,7 +29,8 @@ import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, KeySelectorUtil, RelExplainUtil, UpdatingPlanChecker, WindowJoinUtil}
+import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, KeySelectorUtil, UpdatingPlanChecker, WindowJoinUtil}
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
import org.apache.flink.table.runtime.generated.GeneratedFunction
import org.apache.flink.table.runtime.operators.join.{FlinkJoinType, KeyedCoProcessOperatorWithWatermarkDelay, OuterJoinPaddingUtil, ProcTimeBoundedStreamJoin, RowTimeBoundedStreamJoin}
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
@@ -105,8 +106,8 @@ class StreamExecWindowJoin(
super.explainTerms(pw)
.item("joinType", flinkJoinType.toString)
.item("windowBounds", windowBounds)
- .item("where",
- RelExplainUtil.expressionToString(joinCondition, outputRowType, getExpressionString))
+ .item("where", getExpressionString(
+ joinCondition, outputRowType.getFieldNames.toList, None, preferExpressionFormat(pw)))
.item("select", getRowType.getFieldNames.mkString(", "))
}
@@ -294,7 +295,7 @@ class StreamExecWindowJoin(
val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
leftPlan,
rightPlan,
- "Co-Process",
+ getRelDetailedDescription,
new LegacyKeyedCoProcessOperator(procJoinFunc).
asInstanceOf[TwoInputStreamOperator[BaseRow,BaseRow,BaseRow]],
returnTypeInfo,
@@ -338,7 +339,7 @@ class StreamExecWindowJoin(
val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
leftPlan,
rightPlan,
- "Co-Process",
+ getRelDetailedDescription,
new KeyedCoProcessOperatorWithWatermarkDelay(rowJoinFunc, rowJoinFunc.getMaxOutputDelay)
.asInstanceOf[TwoInputStreamOperator[BaseRow,BaseRow,BaseRow]],
returnTypeInfo,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
index 750cb56..34a4d9a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
@@ -82,7 +82,7 @@ object ExecNodePlanDumper {
*/
def dagToString(
nodes: Seq[ExecNode[_, _]],
- detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ detailLevel: SqlExplainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
withExecNodeId: Boolean = false,
withRetractTraits: Boolean = false,
withOutputType: Boolean = false,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
index 832f059..acf20ec 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
@@ -70,7 +70,7 @@ object FlinkRelOptUtil {
*/
def toString(
rel: RelNode,
- detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ detailLevel: SqlExplainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
withIdPrefix: Boolean = false,
withRetractTraits: Boolean = false,
withRowType: Boolean = false): String = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
index 6cf72aa..15511e8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.nodes.ExpressionFormat
import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
import com.google.common.collect.ImmutableMap
-import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.{RelCollation, RelWriter}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Window.Group
import org.apache.calcite.rel.core.{AggregateCall, Window}
@@ -47,6 +47,19 @@ import scala.collection.mutable
object RelExplainUtil {
/**
+ * Returns the prefer [[ExpressionFormat]] of the [[RelWriter]]. Use Prefix for traditional
+ * writers, but use Infix for [[RelDescriptionWriterImpl]] which is more readable.
+ * The [[RelDescriptionWriterImpl]] is mainly used to generate
+ * [[org.apache.flink.table.planner.plan.nodes.FlinkRelNode#getRelDetailedDescription()]].
+ */
+ def preferExpressionFormat(pw: RelWriter): ExpressionFormat = pw match {
+ // infix format is more readable for displaying
+ case _: RelDescriptionWriterImpl => ExpressionFormat.Infix
+ // traditional writer prefers prefix expression format, e.g. +(x, y)
+ case _ => ExpressionFormat.Prefix
+ }
+
+ /**
* Converts field names corresponding to given indices to String.
*/
def fieldToString(fieldIndices: Array[Int], inputType: RelDataType): String = {
@@ -574,33 +587,16 @@ object RelExplainUtil {
buf.toString
}
- def calcToString(
- calcProgram: RexProgram,
- f: (RexNode, List[String], Option[List[RexNode]], ExpressionFormat) => String): String = {
- val inFields = calcProgram.getInputRowType.getFieldNames.toList
- val localExprs = calcProgram.getExprList.toList
- val selectionStr = selectionToString(calcProgram, f, ExpressionFormat.Infix)
- val cond = calcProgram.getCondition
- val name = s"${
- if (cond != null) {
- s"where: ${
- f(cond, inFields, Some(localExprs), ExpressionFormat.Infix)}, "
- } else {
- ""
- }
- }select: ($selectionStr)"
- s"Calc($name)"
- }
-
def conditionToString(
calcProgram: RexProgram,
- f: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+ f: (RexNode, List[String], Option[List[RexNode]], ExpressionFormat) => String,
+ expressionFormat: ExpressionFormat = ExpressionFormat.Prefix): String = {
val cond = calcProgram.getCondition
val inputFieldNames = calcProgram.getInputRowType.getFieldNames.toList
val localExprs = calcProgram.getExprList.toList
if (cond != null) {
- f(cond, inputFieldNames, Some(localExprs))
+ f(cond, inputFieldNames, Some(localExprs), expressionFormat)
} else {
""
}
@@ -626,16 +622,6 @@ object RelExplainUtil {
}.mkString(", ")
}
- def correlateOpName(
- inputType: RelDataType,
- rexCall: RexCall,
- sqlFunction: TableSqlFunction,
- rowType: RelDataType,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
- s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, expression)}," +
- s" select: ${rowType.getFieldNames.mkString(",")}"
- }
-
def correlateToString(
inputType: RelDataType,
rexCall: RexCall,
@@ -647,39 +633,6 @@ object RelExplainUtil {
s"table($udtfName($operands))"
}
- def aggOperatorName(
- prefix: String,
- grouping: Array[Int],
- auxGrouping: Array[Int],
- inputRowType: RelDataType,
- outputRowType: RelDataType,
- aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
- isMerge: Boolean,
- isFinal: Boolean): String = {
- val groupingStr = if (grouping.nonEmpty) {
- s"groupBy:(${fieldToString(grouping, inputRowType)}),"
- } else {
- ""
- }
- val auxGroupingStr = if (auxGrouping.nonEmpty) {
- s"auxGrouping:(${fieldToString(auxGrouping, inputRowType)}),"
- } else {
- ""
- }
-
- val selectString = s"select:(${
- groupAggregationToString(
- inputRowType,
- outputRowType,
- grouping,
- auxGrouping,
- aggCallToAggFunction,
- isMerge,
- isFinal)
- }),"
- s"$prefix($groupingStr$auxGroupingStr$selectString)"
- }
-
def windowAggregationToString(
inputType: RelDataType,
grouping: Array[Int],
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
index 7c1013a..37f55a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
@@ -131,7 +131,8 @@ object ScanUtil {
* @param qualifiedName qualified name for table
*/
private[flink] def getOperatorName(qualifiedName: Seq[String], rowType: RelDataType): String = {
- val s = s"table:$qualifiedName, fields:(${rowType.getFieldNames.mkString(", ")})"
- s"SourceConversion($s)"
+ val tableQualifiedName = qualifiedName.mkString(".")
+ val fieldNames = rowType.getFieldNames.mkString(", ")
+ s"SourceConversion(table=[$tableQualifiedName], fields=[$fieldNames])"
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
index 223aeee..56d2ca8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
@@ -16,17 +16,17 @@ HashJoin(joinType=[InnerJoin], where=[=(s3, s30)], select=[b1, l2, s3, d4, dd5,
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+ content : SourceConversion(table=[default_catalog.default_database.T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]], fields=[b1, l2, s3, d4, dd5])
ship_strategy : FORWARD
: Data Source
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+ content : SourceConversion(table=[default_catalog.default_database.T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]], fields=[b1, l2, s3, d4, dd5])
ship_strategy : FORWARD
: Operator
- content : HashJoin(where: (s3 = s30), buildLeft)
+ content : HashJoin(joinType=[InnerJoin], where=[(s3 = s30)], select=[b1, l2, s3, d4, dd5, b10, l20, s30, d40, dd50], isBroadcast=[true], build=[left])
ship_strategy : BROADCAST
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml
index 8eda3a8..06d8b88 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml
@@ -37,23 +37,23 @@ Calc(select=[EXPR$0])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (a))
+ content : Calc(select=[a])
ship_strategy : FORWARD
: Operator
- content : LocalHashAggregate(groupBy:(a),select:(a, Partial_COUNT(*) AS count1$0),)
+ content : LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
ship_strategy : FORWARD
: Operator
- content : GlobalHashAggregate(groupBy:(a),select:(a, Final_COUNT(count1$0) AS EXPR$0),)
+ content : HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS EXPR$0])
ship_strategy : HASH[a]
: Operator
- content : Calc(select: (EXPR$0))
+ content : Calc(select=[EXPR$0])
ship_strategy : FORWARD
]]>
@@ -80,23 +80,23 @@ Calc(select=[EXPR$0]): rowcount = , cumulative cost = {rows, cpu, io, network, m
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (a))
+ content : Calc(select=[a])
ship_strategy : FORWARD
: Operator
- content : LocalHashAggregate(groupBy:(a),select:(a, Partial_COUNT(*) AS count1$0),)
+ content : LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
ship_strategy : FORWARD
: Operator
- content : GlobalHashAggregate(groupBy:(a),select:(a, Final_COUNT(count1$0) AS EXPR$0),)
+ content : HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS EXPR$0])
ship_strategy : HASH[a]
: Operator
- content : Calc(select: (EXPR$0))
+ content : Calc(select=[EXPR$0])
ship_strategy : FORWARD
]]>
@@ -116,7 +116,7 @@ BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
]]>
@@ -136,7 +136,7 @@ BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
]]>
@@ -158,11 +158,11 @@ Calc(select=[a, b, c], where=[=(MOD(a, 2), 0)])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(where: ((a MOD 2) = 0), select: (a, b, c))
+ content : Calc(select=[a, b, c], where=[((a MOD 2) = 0)])
ship_strategy : FORWARD
]]>
@@ -184,11 +184,11 @@ Calc(select=[a, b, c], where=[=(MOD(a, 2), 0)]): rowcount = , cumulative cost =
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(where: ((a MOD 2) = 0), select: (a, b, c))
+ content : Calc(select=[a, b, c], where=[((a MOD 2) = 0)])
ship_strategy : FORWARD
]]>
@@ -219,19 +219,19 @@ Calc(select=[a, b, c, e, f])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
ship_strategy : FORWARD
: Operator
- content : SortMergeJoin(where: (a = d))
+ content : SortMergeJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f])
ship_strategy : HASH[a]
: Operator
- content : Calc(select: (a, b, c, e, f))
+ content : Calc(select=[a, b, c, e, f])
ship_strategy : FORWARD
]]>
@@ -262,19 +262,19 @@ Calc(select=[a, b, c, e, f]): rowcount = , cumulative cost = {rows, cpu, io, net
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
ship_strategy : FORWARD
: Operator
- content : SortMergeJoin(where: (a = d))
+ content : SortMergeJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f])
ship_strategy : HASH[a]
: Operator
- content : Calc(select: (a, b, c, e, f))
+ content : Calc(select=[a, b, c, e, f])
ship_strategy : FORWARD
]]>
@@ -317,23 +317,23 @@ Sink(name=[sink2], fields=[a, cnt])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (a))
+ content : Calc(select=[a])
ship_strategy : FORWARD
: Operator
- content : LocalHashAggregate(groupBy:(a),select:(a, Partial_COUNT(*) AS count1$0),)
+ content : LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
ship_strategy : FORWARD
: Operator
- content : GlobalHashAggregate(groupBy:(a),select:(a, Final_COUNT(count1$0) AS cnt),)
+ content : HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt])
ship_strategy : HASH[a]
: Operator
- content : Calc(where: (cnt > 10), select: (a, cnt))
+ content : Calc(select=[a, cnt], where=[(cnt > 10)])
ship_strategy : FORWARD
: Operator
@@ -341,7 +341,7 @@ Sink(name=[sink2], fields=[a, cnt])
ship_strategy : FORWARD
: Operator
- content : Calc(where: (cnt < 10), select: (a, cnt))
+ content : Calc(select=[a, cnt], where=[(cnt < 10)])
ship_strategy : FORWARD
: Operator
@@ -396,23 +396,23 @@ Sink(name=[sink2], fields=[a, cnt]): rowcount = , cumulative cost = {rows, cpu,
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (a))
+ content : Calc(select=[a])
ship_strategy : FORWARD
: Operator
- content : LocalHashAggregate(groupBy:(a),select:(a, Partial_COUNT(*) AS count1$0),)
+ content : LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
ship_strategy : FORWARD
: Operator
- content : GlobalHashAggregate(groupBy:(a),select:(a, Final_COUNT(count1$0) AS cnt),)
+ content : HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt])
ship_strategy : HASH[a]
: Operator
- content : Calc(where: (cnt > 10), select: (a, cnt))
+ content : Calc(select=[a, cnt], where=[(cnt > 10)])
ship_strategy : FORWARD
: Operator
@@ -420,7 +420,7 @@ Sink(name=[sink2], fields=[a, cnt]): rowcount = , cumulative cost = {rows, cpu,
ship_strategy : FORWARD
: Operator
- content : Calc(where: (cnt < 10), select: (a, cnt))
+ content : Calc(select=[a, cnt], where=[(cnt < 10)])
ship_strategy : FORWARD
: Operator
@@ -456,11 +456,11 @@ Sink(name=[sink], fields=[a, b, c])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(where: (a > 10), select: (a, b, c))
+ content : Calc(select=[a, b, c], where=[(a > 10)])
ship_strategy : FORWARD
: Operator
@@ -492,11 +492,11 @@ Sink(name=[sink], fields=[a, b, c]): rowcount = , cumulative cost = {rows, cpu,
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(where: (a > 10), select: (a, b, c))
+ content : Calc(select=[a, b, c], where=[(a > 10)])
ship_strategy : FORWARD
: Operator
@@ -528,15 +528,15 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : LocalSortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+ content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[false])
ship_strategy : FORWARD
: Operator
- content : GlobalSortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+ content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true])
ship_strategy : GLOBAL
]]>
@@ -560,15 +560,15 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true]): rowcount = , c
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : LocalSortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+ content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[false])
ship_strategy : FORWARD
: Operator
- content : GlobalSortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+ content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true])
ship_strategy : GLOBAL
]]>
@@ -588,7 +588,7 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [TestTableSource(a, b, c)]], fields=[a, b, c])
ship_strategy : FORWARD
]]>
@@ -608,7 +608,7 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [TestTableSource(a, b, c)]], fields=[a, b, c])
ship_strategy : FORWARD
]]>
@@ -636,11 +636,11 @@ Union(all=[true], union=[a, b, c]): rowcount = , cumulative cost = {rows, cpu, i
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
ship_strategy : FORWARD
]]>
@@ -668,11 +668,11 @@ Union(all=[true], union=[a, b, c])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
ship_strategy : FORWARD
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index 16d3278..f50b17e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -30,7 +30,7 @@ DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a,
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
]]>
@@ -56,19 +56,19 @@ Calc(select=[EXPR$0], updateAsRetraction=[false], accMode=[Acc]): rowcount = , c
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (a))
+ content : Calc(select=[a])
ship_strategy : FORWARD
: Operator
- content : GroupAggregate
+ content : GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS EXPR$0])
ship_strategy : HASH
: Operator
- content : Calc(select: (EXPR$0))
+ content : Calc(select=[EXPR$0])
ship_strategy : FORWARD
]]>
@@ -88,7 +88,7 @@ DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a,
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
]]>
@@ -108,7 +108,7 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [TestTableSource(a, b, c)]], fields=[a, b, c])
ship_strategy : FORWARD
]]>
@@ -128,7 +128,7 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [TestTableSource(a, b, c)]], fields=[a, b, c])
ship_strategy : FORWARD
]]>
@@ -154,19 +154,19 @@ Calc(select=[EXPR$0])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (a))
+ content : Calc(select=[a])
ship_strategy : FORWARD
: Operator
- content : GroupAggregate
+ content : GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS EXPR$0])
ship_strategy : HASH
: Operator
- content : Calc(select: (EXPR$0))
+ content : Calc(select=[EXPR$0])
ship_strategy : FORWARD
]]>
@@ -188,11 +188,11 @@ Calc(select=[a, b, c], where=[=(MOD(a, 2), 0)])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(where: ((a MOD 2) = 0), select: (a, b, c))
+ content : Calc(select=[a, b, c], where=[((a MOD 2) = 0)])
ship_strategy : FORWARD
]]>
@@ -214,11 +214,11 @@ Calc(select=[a, b, c], where=[=(MOD(a, 2), 0)], updateAsRetraction=[false], accM
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(where: ((a MOD 2) = 0), select: (a, b, c))
+ content : Calc(select=[a, b, c], where=[((a MOD 2) = 0)])
ship_strategy : FORWARD
]]>
@@ -249,19 +249,19 @@ Calc(select=[a, b, c, e, f])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
ship_strategy : FORWARD
: Operator
- content : InnerJoin(where: (=(a, d)), select: (a, b, c, d, e, f), leftInputSpec: NoUniqueKey, rightInputSpec: NoUniqueKey)
+ content : Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
ship_strategy : HASH
: Operator
- content : Calc(select: (a, b, c, e, f))
+ content : Calc(select=[a, b, c, e, f])
ship_strategy : FORWARD
]]>
@@ -289,11 +289,11 @@ Union(all=[true], union=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): r
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
ship_strategy : FORWARD
]]>
@@ -324,19 +324,19 @@ Calc(select=[a, b, c, e, f], updateAsRetraction=[false], accMode=[Acc]): rowcoun
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
ship_strategy : FORWARD
: Operator
- content : InnerJoin(where: (=(a, d)), select: (a, b, c, d, e, f), leftInputSpec: NoUniqueKey, rightInputSpec: NoUniqueKey)
+ content : Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
ship_strategy : HASH
: Operator
- content : Calc(select: (a, b, c, e, f))
+ content : Calc(select=[a, b, c, e, f])
ship_strategy : FORWARD
]]>
@@ -378,19 +378,19 @@ Sink(name=[upsertSink2], fields=[a, cnt])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (a))
+ content : Calc(select=[a])
ship_strategy : FORWARD
: Operator
- content : GroupAggregate
+ content : GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS cnt])
ship_strategy : HASH
: Operator
- content : Calc(where: (cnt > 10), select: (a, cnt))
+ content : Calc(select=[a, cnt], where=[(cnt > 10)])
ship_strategy : FORWARD
: Operator
@@ -402,7 +402,7 @@ Sink(name=[upsertSink2], fields=[a, cnt])
ship_strategy : FORWARD
: Operator
- content : Calc(where: (cnt < 10), select: (a, cnt))
+ content : Calc(select=[a, cnt], where=[(cnt < 10)])
ship_strategy : FORWARD
: Operator
@@ -460,19 +460,19 @@ Sink(name=[upsertSink2], fields=[a, cnt], updateAsRetraction=[false], accMode=[A
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (a))
+ content : Calc(select=[a])
ship_strategy : FORWARD
: Operator
- content : GroupAggregate
+ content : GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS cnt])
ship_strategy : HASH
: Operator
- content : Calc(where: (cnt > 10), select: (a, cnt))
+ content : Calc(select=[a, cnt], where=[(cnt > 10)])
ship_strategy : FORWARD
: Operator
@@ -484,7 +484,7 @@ Sink(name=[upsertSink2], fields=[a, cnt], updateAsRetraction=[false], accMode=[A
ship_strategy : FORWARD
: Operator
- content : Calc(where: (cnt < 10), select: (a, cnt))
+ content : Calc(select=[a, cnt], where=[(cnt < 10)])
ship_strategy : FORWARD
: Operator
@@ -524,11 +524,11 @@ Sink(name=[appendSink], fields=[a, b, c])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(where: (a > 10), select: (a, b, c))
+ content : Calc(select=[a, b, c], where=[(a > 10)])
ship_strategy : FORWARD
: Operator
@@ -560,11 +560,11 @@ Sink(name=[appendSink], fields=[a, b, c], updateAsRetraction=[false], accMode=[A
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : Calc(where: (a > 10), select: (a, b, c))
+ content : Calc(select=[a, b, c], where=[(a > 10)])
ship_strategy : FORWARD
: Operator
@@ -595,11 +595,11 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+ content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5])
ship_strategy : GLOBAL
]]>
@@ -622,11 +622,11 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], updateAsRetraction=[false], ac
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+ content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5])
ship_strategy : GLOBAL
]]>
@@ -689,35 +689,35 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, text, rowtime))
+ content : SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, text, rowtime])
ship_strategy : FORWARD
: Operator
- content : WatermarkAssigner(rowtime: 2, offset: 0)
+ content : WatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, cnt, name, goods, rowtime))
+ content : SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, cnt, name, goods, rowtime])
ship_strategy : FORWARD
: Operator
- content : WatermarkAssigner(rowtime: 4, offset: 0)
+ content : WatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
ship_strategy : FORWARD
: Operator
- content : Co-Process
+ content : WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0])
ship_strategy : HASH
: Operator
- content : Calc(select: (id1, rowtime AS ts, text))
+ content : Calc(select=[id1, rowtime AS ts, text])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
+ content : Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
ship_strategy : FORWARD
: Operator
- content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
@@ -725,11 +725,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
ship_strategy : FORWARD
: Operator
- content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
+ content : Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3])
ship_strategy : FORWARD
: Operator
- content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+ content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
@@ -769,11 +769,11 @@ Union(all=[true], union=[a, b, c])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+ content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
ship_strategy : FORWARD
]]>
@@ -836,35 +836,35 @@ Sink(name=[appendSink2], fields=[a, b])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, text, rowtime))
+ content : SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, text, rowtime])
ship_strategy : FORWARD
: Operator
- content : WatermarkAssigner(rowtime: 2, offset: 0)
+ content : WatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, cnt, name, goods, rowtime))
+ content : SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, cnt, name, goods, rowtime])
ship_strategy : FORWARD
: Operator
- content : WatermarkAssigner(rowtime: 4, offset: 0)
+ content : WatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
ship_strategy : FORWARD
: Operator
- content : Co-Process
+ content : WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0])
ship_strategy : HASH
: Operator
- content : Calc(select: (id1, rowtime AS ts, text))
+ content : Calc(select=[id1, rowtime AS ts, text])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
+ content : Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
ship_strategy : FORWARD
: Operator
- content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
@@ -872,11 +872,11 @@ Sink(name=[appendSink2], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
+ content : Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3])
ship_strategy : FORWARD
: Operator
- content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+ content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
index 217eb87..d2f4740 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
@@ -29,7 +29,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[a, c])
+Values(type=[RecordType(INTEGER a, VARCHAR(2147483647) c)], tuples=[[]], values=[a, c])
]]>
</Resource>
</TestCase>
@@ -88,7 +88,7 @@ LogicalSort(offset=[10], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[a, c])
+Values(type=[RecordType(INTEGER a, VARCHAR(2147483647) c)], tuples=[[]], values=[a, c])
]]>
</Resource>
</TestCase>
@@ -126,7 +126,7 @@ LogicalSort(offset=[0], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[a, c])
+Values(type=[RecordType(INTEGER a, VARCHAR(2147483647) c)], tuples=[[]], values=[a, c])
]]>
</Resource>
</TestCase>
@@ -143,7 +143,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
index 92aa3c5..15f2dc0 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
@@ -87,7 +87,7 @@ LogicalIntersect(all=[false])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[c])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]], values=[c])
]]>
</Resource>
</TestCase>
@@ -107,7 +107,7 @@ LogicalIntersect(all=[false])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[c])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]], values=[c])
]]>
</Resource>
</TestCase>
@@ -182,7 +182,7 @@ LogicalMinus(all=[false])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[c])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]], values=[c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.xml
index 74112af..fb45f0c 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.xml
@@ -49,7 +49,7 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-firs
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -85,7 +85,7 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -141,7 +141,7 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -198,7 +198,7 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-firs
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index a7d748c..a263d16 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -28,7 +28,7 @@ LogicalProject(a=[$0], s=[$3])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(c=[$cor0.c])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -61,7 +61,7 @@ LogicalProject(b=[$0], s=[$2])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(set=[$cor0.set])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -90,7 +90,7 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
+- LogicalProject(k=[$0], v=[$1])
+- Uncollect
+- LogicalProject(c=[$cor0.c])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -121,7 +121,7 @@ LogicalProject(a=[$0], b=[$1], x=[$2], y=[$3])
+- LogicalProject(x=[$0], y=[$1])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -151,7 +151,7 @@ LogicalProject(a=[$0], s=[$2])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(set=[$cor0.set])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -180,7 +180,7 @@ LogicalProject(a=[$0], s=[$3])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(c=[$cor0.c])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -209,7 +209,7 @@ LogicalProject(b=[$0], id=[$2], point=[$3])
+- LogicalProject(id=[$0], point=[$1])
+- Uncollect
+- LogicalProject(set=[$cor0.set])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -238,7 +238,7 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -262,7 +262,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3])
+- LogicalProject(s=[$0], t=[$1])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -284,7 +284,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b)]]])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml
index c517550..fa537d3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml
@@ -25,16 +25,16 @@ limitations under the License.
LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b)]
+- LogicalUnion(all=[true]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
:- LogicalProject(EXPR$0=[1], EXPR$1=[2.0:DECIMAL(2, 1)]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(2, 1) EXPR$1)]
- : +- LogicalValues(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+ : +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+- LogicalProject(EXPR$0=[3], EXPR$1=[4:BIGINT]), rowType=[RecordType(INTEGER EXPR$0, BIGINT EXPR$1)]
- +- LogicalValues(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
:- Calc(select=[1 AS EXPR$0, 2.0:DECIMAL(20, 1) AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
-: +- Values(tuples=[[{ 0 }]], values=[ZERO], reuse_id=[1]), rowType=[RecordType(INTEGER ZERO)]
+: +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], values=[ZERO], reuse_id=[1]), rowType=[RecordType(INTEGER ZERO)]
+- Calc(select=[3 AS EXPR$0, 4.0:DECIMAL(20, 1) AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
+- Reused(reference_id=[1]), rowType=[RecordType(INTEGER ZERO)]
]]>
@@ -47,12 +47,12 @@ Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, D
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalValues(tuples=[[{ 1, 2, 3 }]])
++- LogicalValues(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[{ 1, 2, 3 }]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]], values=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -65,18 +65,18 @@ Values(tuples=[[{ 1, 2, 3 }]], values=[a, b, c])
LogicalProject(a=[$0], b=[$1])
+- LogicalUnion(all=[true])
:- LogicalProject(EXPR$0=[1], EXPR$1=[2])
- : +- LogicalValues(tuples=[[{ 0 }]])
+ : +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
:- LogicalProject(EXPR$0=[3], EXPR$1=[null:INTEGER])
- : +- LogicalValues(tuples=[[{ 0 }]])
+ : +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
+- LogicalProject(EXPR$0=[4], EXPR$1=[5])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Union(all=[true], union=[EXPR$0, EXPR$1])
:- Calc(select=[1 AS EXPR$0, CAST(2) AS EXPR$1])
-: +- Values(tuples=[[{ 0 }]], values=[ZERO], reuse_id=[1])
+: +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], values=[ZERO], reuse_id=[1])
:- Calc(select=[3 AS EXPR$0, null:INTEGER AS EXPR$1])
: +- Reused(reference_id=[1])
+- Calc(select=[4 AS EXPR$0, CAST(5) AS EXPR$1])
@@ -91,13 +91,13 @@ Union(all=[true], union=[EXPR$0, EXPR$1])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[null:INTEGER])
-+- LogicalValues(tuples=[[{ 0 }]])
++- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[null:INTEGER AS EXPR$0])
-+- Values(tuples=[[{ 0 }]], values=[ZERO])
++- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], values=[ZERO])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
index 43d4ef2..5d81924 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
@@ -358,7 +358,7 @@ HashJoin(joinType=[LeftOuterJoin], where=[=(k, k0)], select=[k, v, k0, v0], isBr
:- Calc(select=[CAST(0:BIGINT) AS k, v], where=[=(k, 0:BIGINT)])
: +- TableSourceScan(table=[[default_catalog, default_database, src, source: [TestTableSource(k, v)]]], fields=[k, v])
+- Exchange(distribution=[broadcast])
- +- Values(tuples=[[]], values=[k, v])
+ +- Values(type=[RecordType(BIGINT k, VARCHAR(2147483647) v)], tuples=[[]], values=[k, v])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
index a4ced57..f93aa46 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
@@ -436,7 +436,7 @@ LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[d, e, f, g, h, a, b, c], build=[right])
:- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+- Exchange(distribution=[broadcast])
- +- Values(tuples=[[]], values=[a, b, c])
+ +- Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -551,7 +551,7 @@ LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
<![CDATA[
NestedLoopJoin(joinType=[RightOuterJoin], where=[true], select=[d, e, f, g, h, a, b, c], build=[left])
:- Exchange(distribution=[broadcast])
-: +- Values(tuples=[[]], values=[d, e, f, g, h])
+: +- Values(type=[RecordType(INTEGER d, BIGINT e, INTEGER f, VARCHAR(2147483647) g, BIGINT h)], tuples=[[]], values=[d, e, f, g, h])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -697,7 +697,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[=(k, k0)], select=[k, v, k0, v0]
:- Calc(select=[CAST(0:BIGINT) AS k, v], where=[=(k, 0:BIGINT)])
: +- TableSourceScan(table=[[default_catalog, default_database, src, source: [TestTableSource(k, v)]]], fields=[k, v])
+- Exchange(distribution=[broadcast])
- +- Values(tuples=[[]], values=[k, v])
+ +- Values(type=[RecordType(BIGINT k, VARCHAR(2147483647) v)], tuples=[[]], values=[k, v])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
index f19e85a..a1c52e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
@@ -475,7 +475,7 @@ SortMergeJoin(joinType=[LeftOuterJoin], where=[=(k, k0)], select=[k, v, k0, v0])
: +- Calc(select=[CAST(0:BIGINT) AS k, v], where=[=(k, 0:BIGINT)])
: +- TableSourceScan(table=[[default_catalog, default_database, src, source: [TestTableSource(k, v)]]], fields=[k, v])
+- Exchange(distribution=[hash[k]])
- +- Values(tuples=[[]], values=[k, v])
+ +- Values(type=[RecordType(BIGINT k, VARCHAR(2147483647) v)], tuples=[[]], values=[k, v])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPruneAggregateCallRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPruneAggregateCallRuleTest.xml
index 86fa1c9..2ca68f6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPruneAggregateCallRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPruneAggregateCallRuleTest.xml
@@ -89,7 +89,7 @@ LogicalJoin(condition=[$4], joinType=[semi])
+- LogicalAggregate(group=[{}], m=[MIN($0)])
+- LogicalCalc(expr#0=[{inputs}], expr#1=[true], i=[$t1])
+- LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
- +- LogicalValues(tuples=[[]])
+ +- LogicalValues(type=[RecordType(INTEGER a1)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -166,7 +166,7 @@ LogicalJoin(condition=[$4], joinType=[semi])
+- LogicalAggregate(group=[{}], m=[MIN($0)])
+- LogicalCalc(expr#0=[{inputs}], expr#1=[true], i=[$t1])
+- LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
- +- LogicalValues(tuples=[[]])
+ +- LogicalValues(type=[RecordType(INTEGER $f0)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -188,7 +188,7 @@ LogicalProject(a1=[$0], c1=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-LogicalCalc(expr#0..1=[{inputs}], expr#2=[10], expr#3=[>($t1, $t2)], proj#0..1=[{exprs}], $condition=[$t3])
+LogicalCalc(expr#0..1=[{inputs}], expr#2=[10], expr#3=[>($t1, $t2)], a1=[$t0], c1=[$t1], $condition=[$t3])
+- LogicalAggregate(group=[{0}], c1=[COUNT($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]])
]]>
@@ -279,7 +279,7 @@ LogicalProject(a2=[$0], b2=[$1], d2=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-LogicalCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
+LogicalCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[>($t2, $t3)], a2=[$t0], b2=[$t1], d2=[$t2], $condition=[$t4])
+- LogicalAggregate(group=[{0, 1}], d2=[SUM($3)])
+- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
]]>
@@ -368,7 +368,7 @@ LogicalProject(a=[$0], b2=[$1], d2=[$3])
</Resource>
<Resource name="planAfter">
<![CDATA[
-LogicalCalc(expr#0..2=[{inputs}], proj#0..2=[{exprs}])
+LogicalCalc(expr#0..2=[{inputs}], a=[$t0], b2=[$t1], d2=[$t2])
+- LogicalAggregate(group=[{0, 1}], d2=[SUM($3)])
+- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRuleTest.xml
index c92e2a3..624bc19 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRuleTest.xml
@@ -465,13 +465,13 @@ FlinkLogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[SUM($1)], EXPR$2=[MI
<Resource name="planBefore">
<![CDATA[
LogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[SUM($1)], EXPR$2=[MIN($2)])
-+- LogicalValues(tuples=[[{ 1, 2, 3 }]])
++- LogicalValues(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
FlinkLogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[SUM($1)], EXPR$2=[MIN($2)])
-+- FlinkLogicalValues(tuples=[[{ 1, 2, 3 }]])
++- FlinkLogicalValues(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
]]>
</Resource>
</TestCase>
@@ -501,7 +501,7 @@ FlinkLogicalCalc(select=[a, b, c])
+- FlinkLogicalCalc(select=[true AS i])
+- FlinkLogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
+- FlinkLogicalCalc(select=[a])
- +- FlinkLogicalValues(tuples=[[]])
+ +- FlinkLogicalValues(type=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -521,7 +521,7 @@ LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
<![CDATA[
FlinkLogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
+- FlinkLogicalCalc(select=[a])
- +- FlinkLogicalValues(tuples=[[]])
+ +- FlinkLogicalValues(type=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
index 649de95..1102b0a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
@@ -39,7 +39,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalProject($f0=[IS NOT NULL($0)])
+- LogicalAggregate(group=[{}], m=[MIN($0)])
+- LogicalProject(i=[true])
- +- LogicalValues(tuples=[[]])
+ +- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -63,7 +63,7 @@ LogicalSort(fetch=[0])
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalJoin(condition=[OR(=($0, $3), IS NULL($0), IS NULL($3))], joinType=[anti])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalValues(tuples=[[]])
+ +- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -87,7 +87,7 @@ LogicalSort(fetch=[0])
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalValues(tuples=[[]])
+ +- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -110,7 +110,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalValues(tuples=[[]])
+ +- LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -137,7 +137,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalProject($f0=[IS NOT NULL($0)])
+- LogicalAggregate(group=[{}], m=[MIN($0)])
+- LogicalProject(i=[true])
- +- LogicalValues(tuples=[[]])
+ +- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -154,7 +154,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-LogicalValues(tuples=[[]])
+LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -171,7 +171,7 @@ LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], offset=[10], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-LogicalValues(tuples=[[]])
+LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -190,7 +190,7 @@ LogicalProject(a=[$0])
<Resource name="planAfter">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalValues(tuples=[[]])
++- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -207,7 +207,7 @@ LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-LogicalValues(tuples=[[]])
+LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
index d32afbb..cc063dd 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
@@ -34,7 +34,7 @@ LogicalProject(d=[$0])
<Resource name="planAfter">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalValues(tuples=[[]])
++- LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
index c42adc9..adb53b9 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
@@ -28,7 +28,7 @@ LogicalProject(a=[$0], s=[$3])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(c=[$cor0.c])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -63,7 +63,7 @@ LogicalProject(b=[$0], s=[$2])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(set=[$cor0.set])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -92,7 +92,7 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
+- LogicalProject(k=[$0], v=[$1])
+- Uncollect
+- LogicalProject(c=[$cor0.c])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -125,7 +125,7 @@ LogicalProject(a=[$0], b=[$1], x=[$2], y=[$3])
+- LogicalProject(x=[$0], y=[$1])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -159,7 +159,7 @@ LogicalProject(a=[$0], s=[$2])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(set=[$cor0.set])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -187,7 +187,7 @@ LogicalProject(a=[$0], s=[$3])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(c=[$cor0.c])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -218,7 +218,7 @@ LogicalProject(b=[$0], id=[$2], point=[$3])
+- LogicalProject(id=[$0], point=[$1])
+- Uncollect
+- LogicalProject(set=[$cor0.set])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -246,7 +246,7 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -272,7 +272,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3])
+- LogicalProject(s=[$0], t=[$1])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -298,7 +298,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b)]]])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectPruneAggregateCallRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectPruneAggregateCallRuleTest.xml
index 3df36eb..3ded214 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectPruneAggregateCallRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectPruneAggregateCallRuleTest.xml
@@ -89,7 +89,7 @@ LogicalJoin(condition=[$4], joinType=[semi])
+- LogicalAggregate(group=[{}], m=[MIN($0)])
+- LogicalProject(i=[true])
+- LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
- +- LogicalValues(tuples=[[]])
+ +- LogicalValues(type=[RecordType(INTEGER a1)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -166,7 +166,7 @@ LogicalJoin(condition=[$4], joinType=[semi])
+- LogicalAggregate(group=[{}], m=[MIN($0)])
+- LogicalProject(i=[true])
+- LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
- +- LogicalValues(tuples=[[]])
+ +- LogicalValues(type=[RecordType(INTEGER $f0)], tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
index cfae4e1..f0531e6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
@@ -588,7 +588,7 @@ LogicalFilter(condition=[=($0, $cor0.a)])
LogicalTableScan(table=[[default_catalog, default_database, y, source: [TestTableSource(c, d)]]])
})], variablesSet=[[$cor0]])
+- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
- +- LogicalValues(tuples=[[{ 1, 2 }]])
+ +- LogicalValues(type=[RecordType(INTEGER EXPR$0, INTEGER EXPR$1)], tuples=[[{ 1, 2 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -596,7 +596,7 @@ LogicalFilter(condition=[=($0, $cor0.a)])
LogicalProject(a=[$0], b=[$1])
+- LogicalJoin(condition=[=($2, $0)], joinType=[semi])
:- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
- : +- LogicalValues(tuples=[[{ 1, 2 }]])
+ : +- LogicalValues(type=[RecordType(INTEGER EXPR$0, INTEGER EXPR$1)], tuples=[[{ 1, 2 }]])
+- LogicalProject(c=[$0])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, y, source: [TestTableSource(c, d)]]])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LimitTest.xml
index 607f9f4..d9904a3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LimitTest.xml
@@ -29,7 +29,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, BIGINT c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -86,7 +86,7 @@ LogicalSort(offset=[10], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, BIGINT c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -103,7 +103,7 @@ LogicalSort(offset=[0], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, BIGINT c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -120,7 +120,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIMESTAMP(3) proctime, TIME ATTRIBUTE(ROWTIME) rowtime)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -137,7 +137,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(TIMESTAMP(3) proctime, BIGINT c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -154,7 +154,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(TIMESTAMP(3) desc, BIGINT c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -171,7 +171,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(BIGINT c, TIMESTAMP(3) desc)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -188,7 +188,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(BIGINT c, TIMESTAMP(3) proctime)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -205,7 +205,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -222,7 +222,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(TIME ATTRIBUTE(ROWTIME) desc, BIGINT c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -239,7 +239,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(BIGINT c, TIME ATTRIBUTE(ROWTIME) desc)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -256,7 +256,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime)], tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 9a1d03b..1dcbe67 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -301,43 +301,43 @@ Sink(name=[appendSink3], fields=[a, b])
content : collect elements with CollectionInputFormat
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, rowtime, text))
+ content : SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, rowtime, text])
ship_strategy : FORWARD
: Operator
- content : WatermarkAssigner(rowtime: 1, offset: 0)
+ content : WatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
ship_strategy : FORWARD
: Operator
- content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, rowtime, cnt, name, goods))
+ content : SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, rowtime, cnt, name, goods])
ship_strategy : FORWARD
: Operator
- content : WatermarkAssigner(rowtime: 1, offset: 0)
+ content : WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
ship_strategy : FORWARD
: Operator
- content : Co-Process
+ content : WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
ship_strategy : HASH
: Operator
- content : Calc(select: (id1, rowtime AS ts, text))
+ content : Calc(select=[id1, rowtime AS ts, text])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (ts, id1, text, _UTF-16LE'#' AS $f3))
+ content : Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
ship_strategy : FORWARD
: Operator
- content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
ship_strategy : HASH
: Operator
- content : Calc(select: (w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3))
+ content : Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3])
ship_strategy : FORWARD
: Operator
- content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+ content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
@@ -345,11 +345,11 @@ Sink(name=[appendSink3], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (ts, id1, text, _UTF-16LE'-' AS $f3))
+ content : Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3])
ship_strategy : FORWARD
: Operator
- content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+ content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
ship_strategy : HASH
: Operator
@@ -357,15 +357,15 @@ Sink(name=[appendSink3], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (id1, text))
+ content : Calc(select=[id1, text])
ship_strategy : FORWARD
: Operator
- content : LocalGroupAggregate
+ content : LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0])
ship_strategy : FORWARD
: Operator
- content : GlobalGroupAggregate
+ content : GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1])
ship_strategy : HASH
: Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 4317d07..1afa05f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -203,7 +203,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, c])
-+- Deduplicate(keepLastRow=[false], key=[a], order=[PROCTIME])
++- Deduplicate(keep=[FirstRow], key=[a], order=[PROCTIME])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -232,7 +232,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, c])
-+- Deduplicate(keepLastRow=[true], key=[a], order=[PROCTIME])
++- Deduplicate(keep=[LastRow], key=[a], order=[PROCTIME])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
index d4b1f97..b28a386 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
@@ -87,7 +87,7 @@ LogicalIntersect(all=[false])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -107,7 +107,7 @@ LogicalIntersect(all=[false])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -182,7 +182,7 @@ LogicalMinus(all=[false])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortLimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortLimitTest.xml
index 03d9b69..b9ce519 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortLimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortLimitTest.xml
@@ -217,7 +217,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -256,7 +256,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -295,7 +295,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -418,7 +418,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -562,7 +562,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -601,7 +601,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -640,7 +640,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -763,7 +763,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -802,7 +802,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -841,7 +841,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -880,7 +880,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -919,7 +919,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -958,7 +958,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -997,7 +997,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -1036,7 +1036,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -1180,7 +1180,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index e811758..b42e44f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -28,7 +28,7 @@ LogicalProject(a=[$0], s=[$3])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(c=[$cor0.c])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -61,7 +61,7 @@ LogicalProject(b=[$0], s=[$2])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(set=[$cor0.set])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -87,7 +87,7 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
+- LogicalProject(k=[$0], v=[$1])
+- Uncollect
+- LogicalProject(c=[$cor0.c])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -118,7 +118,7 @@ LogicalProject(a=[$0], b=[$1], x=[$2], y=[$3])
+- LogicalProject(x=[$0], y=[$1])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -148,7 +148,7 @@ LogicalProject(a=[$0], s=[$2])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(set=[$cor0.set])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -174,7 +174,7 @@ LogicalProject(a=[$0], s=[$3])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(c=[$cor0.c])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -203,7 +203,7 @@ LogicalProject(b=[$0], id=[$2], point=[$3])
+- LogicalProject(id=[$0], point=[$1])
+- Uncollect
+- LogicalProject(set=[$cor0.set])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -229,7 +229,7 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
+- LogicalProject(s=[$0])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -253,7 +253,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3])
+- LogicalProject(s=[$0], t=[$1])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
@@ -275,7 +275,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b)]]])
+- Uncollect
+- LogicalProject(b=[$cor0.b])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml
index d2ec0f4..6f7026f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml
@@ -25,16 +25,16 @@ limitations under the License.
LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b)]
+- LogicalUnion(all=[true]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
:- LogicalProject(EXPR$0=[1], EXPR$1=[2.0:DECIMAL(2, 1)]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(2, 1) EXPR$1)]
- : +- LogicalValues(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+ : +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+- LogicalProject(EXPR$0=[3], EXPR$1=[4:BIGINT]), rowType=[RecordType(INTEGER EXPR$0, BIGINT EXPR$1)]
- +- LogicalValues(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
:- Calc(select=[1 AS EXPR$0, 2.0:DECIMAL(20, 1) AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
-: +- Values(tuples=[[{ 0 }]], reuse_id=[1]), rowType=[RecordType(INTEGER ZERO)]
+: +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], reuse_id=[1]), rowType=[RecordType(INTEGER ZERO)]
+- Calc(select=[3 AS EXPR$0, 4.0:DECIMAL(20, 1) AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
+- Reused(reference_id=[1]), rowType=[RecordType(INTEGER ZERO)]
]]>
@@ -47,12 +47,12 @@ Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, D
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalValues(tuples=[[{ 1, 2, 3 }]])
++- LogicalValues(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[{ 1, 2, 3 }]])
+Values(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
]]>
</Resource>
</TestCase>
@@ -65,18 +65,18 @@ Values(tuples=[[{ 1, 2, 3 }]])
LogicalProject(a=[$0], b=[$1])
+- LogicalUnion(all=[true])
:- LogicalProject(EXPR$0=[1], EXPR$1=[2])
- : +- LogicalValues(tuples=[[{ 0 }]])
+ : +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
:- LogicalProject(EXPR$0=[3], EXPR$1=[null:INTEGER])
- : +- LogicalValues(tuples=[[{ 0 }]])
+ : +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
+- LogicalProject(EXPR$0=[4], EXPR$1=[5])
- +- LogicalValues(tuples=[[{ 0 }]])
+ +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Union(all=[true], union=[EXPR$0, EXPR$1])
:- Calc(select=[1 AS EXPR$0, CAST(2) AS EXPR$1])
-: +- Values(tuples=[[{ 0 }]], reuse_id=[1])
+: +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], reuse_id=[1])
:- Calc(select=[3 AS EXPR$0, null:INTEGER AS EXPR$1])
: +- Reused(reference_id=[1])
+- Calc(select=[4 AS EXPR$0, CAST(5) AS EXPR$1])
@@ -91,13 +91,13 @@ Union(all=[true], union=[EXPR$0, EXPR$1])
<Resource name="planBefore">
<![CDATA[
LogicalProject(EXPR$0=[null:INTEGER])
-+- LogicalValues(tuples=[[{ 0 }]])
++- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[null:INTEGER AS EXPR$0])
-+- Values(tuples=[[{ 0 }]])
++- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index e60b372..5d63a64 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -1077,7 +1077,7 @@ Join(joinType=[LeftOuterJoin], where=[=(key, key0)], select=[key, v, key0, v0],
: +- Calc(select=[CAST(0:BIGINT) AS key, v], where=[=(key, 0:BIGINT)], updateAsRetraction=[true], accMode=[Acc])
: +- TableSourceScan(table=[[default_catalog, default_database, src, source: [TestTableSource(key, v)]]], fields=[key, v], updateAsRetraction=[true], accMode=[Acc])
+- Exchange(distribution=[hash[key]], updateAsRetraction=[true], accMode=[Acc])
- +- Values(tuples=[[]], updateAsRetraction=[true], accMode=[Acc])
+ +- Values(type=[RecordType(BIGINT key, VARCHAR(2147483647) v)], tuples=[[]], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index f0d9841..5053e8d 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -99,7 +99,7 @@ LogicalProject(a=[$5], c=[$7], c0=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, BIGINT c, BIGINT c0)], tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index b117a11..e0d629d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -241,7 +241,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
def verifyPlan(sql: String): Unit = {
doVerifyPlan(
sql,
- SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ SqlExplainLevel.DIGEST_ATTRIBUTES,
withRowType = false,
printPlanBefore = true)
}
@@ -249,7 +249,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
def verifyPlan(table: Table): Unit = {
doVerifyPlan(
table,
- SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ SqlExplainLevel.DIGEST_ATTRIBUTES,
withRowType = false,
printPlanBefore = true)
}
@@ -257,7 +257,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
def verifyPlanWithType(sql: String): Unit = {
doVerifyPlan(
sql,
- explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
withRowType = true,
printPlanBefore = true)
}
@@ -265,7 +265,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
def verifyPlanWithType(table: Table): Unit = {
doVerifyPlan(
table,
- explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
withRowType = true,
printPlanBefore = true)
}
@@ -279,7 +279,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
val relNode = TableTestUtil.toRelNode(table)
val optimizedPlan = getOptimizedPlan(
Array(relNode),
- explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
withRetractTraits = false,
withRowType = false)
val result = notExpected.forall(!optimizedPlan.contains(_))
@@ -337,7 +337,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
val planBefore = SystemUtils.LINE_SEPARATOR +
FlinkRelOptUtil.toString(
relNode,
- SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ SqlExplainLevel.DIGEST_ATTRIBUTES,
withRowType = withRowType)
assertEqualsOrExpand("planBefore", planBefore)
}
@@ -351,7 +351,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
val table = getTableEnv.sqlQuery(sql)
doVerifyPlan(
table,
- explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
withRowType = false,
withRetractTraits = false,
printResource = true,
@@ -390,7 +390,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
val planBefore = SystemUtils.LINE_SEPARATOR +
FlinkRelOptUtil.toString(
relNode,
- SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ SqlExplainLevel.DIGEST_ATTRIBUTES,
withRowType = withRowType)
assertEqualsOrExpand("planBefore", planBefore)
}
@@ -551,7 +551,7 @@ abstract class TableTestUtil(
def verifyPlan(): Unit = {
doVerifyPlan(
- SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ SqlExplainLevel.DIGEST_ATTRIBUTES,
withRowType = false,
withRetractTraits = false,
printPlanBefore = true)
@@ -581,7 +581,7 @@ abstract class TableTestUtil(
val planBefore = new StringBuilder
relNodes.foreach { sink =>
planBefore.append(System.lineSeparator)
- planBefore.append(FlinkRelOptUtil.toString(sink, SqlExplainLevel.EXPPLAN_ATTRIBUTES))
+ planBefore.append(FlinkRelOptUtil.toString(sink, SqlExplainLevel.DIGEST_ATTRIBUTES))
}
assertEqualsOrExpand("planBefore", planBefore.toString())
}
@@ -686,7 +686,7 @@ case class StreamTableTestUtil(
def verifyPlanWithTrait(): Unit = {
doVerifyPlan(
- SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ SqlExplainLevel.DIGEST_ATTRIBUTES,
withRetractTraits = true,
withRowType = false,
printPlanBefore = true)
@@ -695,7 +695,7 @@ case class StreamTableTestUtil(
def verifyPlanWithTrait(sql: String): Unit = {
doVerifyPlan(
sql,
- SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ SqlExplainLevel.DIGEST_ATTRIBUTES,
withRetractTraits = true,
withRowType = false,
printPlanBefore = true)
@@ -704,7 +704,7 @@ case class StreamTableTestUtil(
def verifyPlanWithTrait(table: Table): Unit = {
doVerifyPlan(
table,
- SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ SqlExplainLevel.DIGEST_ATTRIBUTES,
withRetractTraits = true,
withRowType = false,
printPlanBefore = true)